In [1]:
import multiprocessing
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import col, column, expr
import pyspark.sql.types as T
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import from_unixtime
from pyspark.sql import functions as F
import pandas as pd
import numpy as np

nprocs = multiprocessing.cpu_count()

spark = (pyspark.sql.SparkSession.builder
 .master('local')
 .config('spark.jars.packages', 'mysql:mysql-connector-java:8.0.16')
 .config('spark.driver.memory', '4G')
 .config('spark.driver.cores', nprocs)
 .config('spark.sql.shuffle.partitions', nprocs)
 .appName('MySparkApplication')
 .getOrCreate())

spark.read.json
spark.read.parquet

<bound method DataFrameReader.parquet of <pyspark.sql.readwriter.DataFrameReader object at 0x106f14c88>>

### Exercises
Using case.csv & dept.csv:

read into spark environment (df_case, df_dept)

write df_case and df_dept back to disk into their own directories (my_cases and my_depts)

Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet)

Read your parquet files back into your spark environment.

Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)

Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf)

Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)



In [2]:
#  use this if want to explicitly load columns, not when "inferschema"
case_schema = T.StructType([
    T.StructField("case_id", T.StringType()),
    T.StructField("case_opened", T.DateType()),
    T.StructField("case_closed_date", T.DateType()),
    T.StructField("SLA_due_date", T.DateType()),
    T.StructField("case_late", T.BooleanType()),
    T.StructField("num_days_late", T.FloatType()),
    T.StructField("case_closed", T.BooleanType()),
    T.StructField("dept_division", T.StringType()),
    T.StructField("service_request_type", T.StringType()),
    T.StructField("SLA_days", T.FloatType()),
    T.StructField("case_status", T.StringType()),
    T.StructField("source_id", T.StringType()),
    T.StructField("request_address", T.StringType()),
    T.StructField("council_district", T.StringType())
])

In [3]:
df_case = (spark.read
 .option('header', True)
 .option('inferSchema', True)
# .option('schema', case_schema)          
 .format('csv')
 .load('./sa311/case.csv'))

In [4]:
df_dept = (spark.read
 .option('header', True)
 .option('inferSchema', True)
 .format('csv')
 .load('./sa311/dept.csv'))

In [5]:
df_source = (spark.read
 .option('header', True)
 .option('inferSchema', True)
 .format('csv')
 .load('./sa311/source.csv'))

In [6]:
df_case.show(3)

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Removal Of Obstru...|4.322222222| 

### write df_case and df_dept back to disk into their own directories (my_cases and my_depts)

### Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet)

In [7]:
df_case.write.format('com.databricks.spark.csv') \
  .mode('overwrite').option("header", "true").save('./sa311/my_cases')

In [8]:
df_dept.write.format('com.databricks.spark.csv') \
  .mode('overwrite').option("header", "true").save('./sa311/my_depts')

In [9]:
df_case.write.format('parquet').mode('overwrite').\
    option('header','true').save('./sa311/my_cases_parquet')

In [10]:
df_dept.write.format('parquet').mode('overwrite').\
    option('header','true').save('./sa311/my_dept_parquet')

### Read your parquet files back into your spark environment.

### Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)

In [11]:
cases_parquet_df = spark.read.format('parquet').\
    option("header", True).\
    option("inferSchema", True).\
    load("./sa311/my_cases_parquet")

dept_parquet_df = spark.read.format('parquet').\
    option("header", True).\
    option("inferSchema", True).\
    load("./sa311/my_dept_parquet")

In [12]:
cases_parquet_df.head()

Row(case_id=1014127332, case_opened_date='1/1/18 0:42', case_closed_date='1/1/18 12:29', SLA_due_date='9/26/20 0:42', case_late='NO', num_days_late=-998.5087616000001, case_closed='YES', dept_division='Field Operations', service_request_type='Stray Animal', SLA_days=999.0, case_status='Closed', source_id='svcCRMLS', request_address='2315  EL PASO ST, San Antonio, 78207', council_district=5)

### Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)

### Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf)

In [13]:
cases_pdf = pd.read_csv('./sa311/case.csv', sep=",")
depts_pdf = pd.read_csv('./sa311/dept.csv', sep=",")

In [14]:
# converting to a spark dataframe from pandas
# spark conversion doesn't know what to do with nulls, so
# 1st - convert empty strings, or blanks -  to nans
# 2nd - drop the null values
cases_pdf.replace('', np.nan, inplace=True)
cases_pdf.replace(' ', np.nan, inplace=True)
cases_pdf = cases_pdf.dropna(how='any',axis=0)
depts_pdf.replace('', np.nan, inplace=True)
depts_pdf.replace(' ', np.nan, inplace=True)
depts_pdf = depts_pdf.dropna(how='any',axis=0)


In [15]:
cases_sdf = spark.createDataFrame(cases_pdf)
depts_sdf = spark.createDataFrame(depts_pdf)

In [16]:
cases_sdf.head()

Row(case_id=1014127332, case_opened_date='1/1/18 0:42', case_closed_date='1/1/18 12:29', SLA_due_date='9/26/20 0:42', case_late='NO', num_days_late=-998.5087616, case_closed='YES', dept_division='Field Operations', service_request_type='Stray Animal', SLA_days=999.0, case_status='Closed', source_id='svcCRMLS', request_address='2315  EL PASO ST, San Antonio, 78207', council_district=5)

### Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)

In [17]:
cases_pdf1 = cases_sdf.toPandas()
depts_pdf1 = depts_sdf.toPandas()

 ### Inspect

Read the 311 case data into a Spark DataFrame.
Inspect the DataFrame. Are the data types for each column appropriate? Cast the data to appropriate types as needed.


In [71]:
# Note that this will override some built-in python functions
from pyspark.sql.functions import *
import pyspark.sql.functions as F

In [121]:
df_case = (spark.read
 .option('header', True)
 .option('inferSchema', True)
# .option('schema', case_schema)          
 .format('csv')
 .load('./sa311/case.csv'))

In [122]:
from pyspark.sql.functions import desc

In [123]:
#  how many rows
df_case.count()

841704

In [124]:
# what are the data types
df_case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [125]:
# drop the nulls, now what is the count
df_case = df_case.na.drop()

In [126]:
df_case.count()

823590

In [127]:
df2_case = df_case.withColumn("case_opened_date", (col("case_opened_date").cast("timestamp")))
df3_case = df2_case.withColumn("case_closed_date", (col("case_closed_date").cast("timestamp")))
spark_case_df = df3_case.withColumn("SLA_due_date", (col("SLA_due_date").cast("timestamp")))

In [128]:
spark_case_df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

