In [46]:
import pyspark
import pandas as pd
import uuid

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

read into spark environment (df_case, df_dept)


In [3]:
df_case = spark.read.csv('case.csv', sep=",",
            header=True, inferSchema=True)

df_dept = spark.read.csv("dept.csv", sep=",",
            header=True, inferSchema=True)

In [12]:
df_case.show(2)

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Removal Of Obstru...|4.322222222| 

In [5]:
df_dept.show(2)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 2 rows



write df_case and df_dept back to disk into their own directories (my_cases and my_depts)


In [6]:
df_case.write.csv('my_cases', header=True, mode='overwrite')

df_dept.write.csv('my_depts', header=True, mode='overwrite')

Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet)


In [7]:
df_case.write.parquet('my_cases_parquet', mode='overwrite')

df_dept.write.parquet('my_depts_parquet', mode='overwrite')

Read your parquet files back into your spark environment.


In [8]:
df_case_pq = spark.read.parquet('my_cases_parquet')

df_dept_pq = spark.read.parquet('my_depts_parquet')


In [9]:
df_case_pq.show(2)

+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date| SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|1014551581|   5/28/18 13:14|   5/28/18 14:23|5/28/18 16:14|       NO| -0.077511574|        YES|Field Operations|     Officer Standby|   0.125|     Closed|  NO10960|7003  RAVENSDALE,...|               6|
|1014551583|   5/28/18 13:15|   5/29/18 14:38|  6/1/18 8:30|       NO| -2.743912037|        YES|Waste Collection|           No Pickup|3.801875|     Closed|   138793|1906  MOSSY CREEK..

In [10]:
df_dept_pq.show(2)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 2 rows



Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)


In [15]:
cases_pdf = pd.read_csv('case.csv')
depts_pdf = pd.read_csv('dept.csv')

In [42]:
depts_pdf.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


In [43]:
depts_pdf.dtypes

dept_division             object
dept_name                 object
standardized_dept_name    object
dept_subject_to_SLA       object
dtype: object

Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf)


In [44]:
from pyspark.sql.types import StructType, FloatType, StructField, StringType, NumericType, IntegerType, DataType, TimestampType

schema_cases = StructType([
    StructField('case_id', IntegerType()),
    StructField('case_opened_date', StringType()),
    StructField('case_closed_date', StringType()),
    StructField('SLA_due_date', StringType()),
    StructField('case_late', StringType()),
    StructField('num_days_late', FloatType()),
    StructField('case_closed', StringType()),
    StructField('dept_division', StringType()),
    StructField('service_request_type', StringType()),
    StructField('SLA_days', FloatType()),
    StructField('case_status', StringType()),
    StructField('source_id', StringType()),
    StructField('request_address', StringType()),
    StructField('council_district', IntegerType())
                    ])

schema_depts = StructType([
    StructField('dept_division', StringType()),
    StructField('dept_name', StringType()),
    StructField('standardized_dept_name', StringType()),
    StructField('dept_subject_to_SLA', StringType()),
                    ])

cases_sdf = spark.createDataFrame(cases_pdf, schema= schema_cases)
depts_sdf = spark.createDataFrame(depts_pdf, schema= schema_depts)


Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)


In [45]:
cases_pdf1 = cases_sdf.toPandas()
depts_pdf1 = depts_sdf.toPandas()

Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables.


In [50]:
case_hive = "df_" + str(uuid.uuid4().hex)  
cases_sdf.write.saveAsTable(case_hive)

dept_hive = "df_" + str(uuid.uuid4().hex)  
depts_sdf.write.saveAsTable(dept_hive)

Explore the Hive database/tables you have created using the methods in the lesson.


In [53]:
case_hive_query = "DESCRIBE %s" % case_hive
spark.sql(case_hive_query).show()


+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|      int|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|    float|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|    float|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|      int|   null|
+--------------------+---------+-------+



In [55]:
case_hive

'df_c0c8449fe9f749b280b19e4aeb00b046'

In [54]:
dept_hive_query = "DESCRIBE %s" % dept_hive
spark.sql(dept_hive_query).show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       dept_division|   string|   null|
|           dept_name|   string|   null|
|standardized_dept...|   string|   null|
| dept_subject_to_SLA|   string|   null|
+--------------------+---------+-------+



In [56]:
dept_hive

'df_f694e2cda8e74cde853e867d58d87391'

Read from the tables into two spark dataframes (cases_sdf, depts_sdf)


In [64]:
spark.sql("DESCRIBE %s" % case_hive).show()


+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|      int|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|    float|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|    float|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|      int|   null|
+--------------------+---------+-------+



In [57]:
spark.sql("DESCRIBE %s" % dept_hive).show()


+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       dept_division|   string|   null|
|           dept_name|   string|   null|
|standardized_dept...|   string|   null|
| dept_subject_to_SLA|   string|   null|
+--------------------+---------+-------+



In [61]:
case_hive

'df_c0c8449fe9f749b280b19e4aeb00b046'