# Exercise

Using case.csv & dept.csv:  
1. read into spark environment (df_case, df_dept)  
2. write df_case and df_dept back to disk into their own directories (my_cases and my_depts)  
3. Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet)  
4. Read your parquet files back into your spark environment.  
5. Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)  
6. Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf)  
7. Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)  
8. Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables.   
9. Explore the Hive database/tables you have created using the methods in the lesson.  
10. Read from the tables into two spark dataframes (cases_sdf, depts_sdf) 

In [177]:
import pyspark

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import uuid   # Create unique table name

In [60]:
spark = SparkSession.builder.master("local").appName("read").\
    enableHiveSupport().\
    getOrCreate()

In [69]:
df_case = spark.read.csv("sa311/case.csv", sep=",",
                    header=True, inferSchema=True)
df_dept = spark.read.csv("sa311/dept.csv", sep=",",
                    header=True, inferSchema=True)

In [71]:
df_case.toPandas()  

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.000000,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125000,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7
5,1014127337,1/1/18 6:28,1/1/18 14:38,1/31/18 8:30,NO,-29.743981,YES,Signals,Traffic Signal Ops and Maintenance,30.084468,Closed,svcCRMSS,BANDERA RD and BRESNAHAN,7
6,1014127338,1/1/18 6:57,1/2/18 15:32,1/17/18 8:30,NO,-14.706736,YES,Code Enforcement,Front Or Side Yard Parking,16.064294,Closed,svcCRMSS,"10133 FIGARO CANYON, San Antonio, 78251",4
7,1014127339,1/1/18 6:58,1/2/18 15:32,1/17/18 8:30,NO,-14.706620,YES,Code Enforcement,Front Or Side Yard Parking,16.063796,Closed,svcCRMSS,"10133 FIGARO CANYON, San Antonio, 78251",4
8,1014127340,1/1/18 6:58,1/2/18 15:32,1/17/18 8:30,NO,-14.706620,YES,Code Enforcement,Right Of Way/Sidewalk Obstruction,16.063333,Closed,svcCRMSS,"10133 FIGARO CANYON, San Antonio, 78251",4
9,1014127341,1/1/18 6:59,1/2/18 15:32,1/17/18 8:30,NO,-14.706493,YES,Code Enforcement,Front Or Side Yard Parking,16.062859,Closed,svcCRMSS,"10133 FIGARO CANYON, San Antonio, 78251",4


In [64]:
df = spark.read.format("csv").\
    option("sep", ",").\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/source.csv")

df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [65]:
df.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



In [68]:
schema = StructType([
    StructField("source_id", StringType()),
    StructField("source_username", StringType())
])

In [137]:
df_case.write.format('csv').mode("overwrite").\
    option("header","true").save("sa311/my_cases")

In [138]:
df_dept.write.format('csv').mode("overwrite").\
    option("header","true").save("sa311/my_depts")

In [139]:
df_case.write.format('parquet').mode('overwrite').\
    option('header','true').save('sa311/my_cases_parquet')

In [140]:
df_dept.write.format('parquet').mode('overwrite').\
    option('header','true').save('sa311/my_depts_parquet')

In [141]:
df_cases_parquet = spark.read.parquet("sa311/my_cases_parquet")
df_depts_parquet = spark.read.parquet("sa311/my_depts_parquet")

In [171]:
cases_pdf = pd.read_csv("sa311/case.csv", sep=",").dropna()
cases_pdf = cases_pdf.head()

In [172]:
depts_pdf = pd.read_csv("sa311/dept.csv", sep=",").dropna()
depts_pdf.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


In [173]:
cases_sdf = spark.createDataFrame(cases_pdf)
cases_sdf.show(5)

+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO| -2.012604167|        YES|     Storm Water|Removal Of Obstru...|4.322222222|     Closed| svcCRMSS|2215  GOL

In [166]:
df_dept_pd.isnull().sum()

dept_division             0
dept_name                 1
standardized_dept_name    0
dept_subject_to_SLA       0
dtype: int64

In [174]:
depts_sdf = spark.createDataFrame(depts_pdf)
depts_sdf.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [175]:
cases_pdf1 = cases_sdf.toPandas()
cases_pdf1.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [176]:
depts_pdf1 = depts_sdf.toPandas()
depts_pdf1.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


In [186]:
cases_table = "df_" + str(uuid.uuid4().hex)  
cases_sdf.write.saveAsTable(cases_table)

In [187]:
depts_table = "df_" + str(uuid.uuid4().hex)  
depts_sdf.write.saveAsTable(depts_table)

In [188]:
query = "DESCRIBE %s" % cases_table
spark.sql(query).show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|   bigint|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   double|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   double|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|   bigint|   null|
+--------------------+---------+-------+



In [189]:
query = "DESCRIBE %s" % depts_table
spark.sql(query).show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       dept_division|   string|   null|
|           dept_name|   string|   null|
|standardized_dept...|   string|   null|
| dept_subject_to_SLA|   string|   null|
+--------------------+---------+-------+



10. Read from the tables into two spark dataframes (cases_sdf, depts_sdf) 

In [190]:
spark.sql("SHOW DATABASES").show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [191]:
spark.sql("USE default")
spark.sql("SHOW TABLES").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|df_2216a1f6aa8848...|      false|
| default|df_4c5a61b1ed834b...|      false|
| default|df_6eb0c9bf0d1c4d...|      false|
| default|df_d9e46cf4b15249...|      false|
| default|df_ea8e6cd5a9fb46...|      false|
+--------+--------------------+-----------+



In [194]:
spark.sql(f"DESCRIBE {cases_table}").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|   bigint|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   double|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   double|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|   bigint|   null|
+--------------------+---------+-------+



In [195]:
spark.sql(f"SELECT * FROM {depts_table} LIMIT 10").show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Director's Office...|Trans & Cap Impro...|

In [196]:
cases_sdf1 = spark.sql(f"SELECT * FROM {cases_table}")
cases_sdf1.show(5)

+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO| -2.012604167|        YES|     Storm Water|Removal Of Obstru...|4.322222222|     Closed| svcCRMSS|2215  GOL

In [197]:
depts_sdf1 = spark.sql(f"SELECT * FROM {depts_table}")
depts_sdf1.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows

