# acquire

In [1]:
import pyspark
from pyspark.sql.types import *

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.types import *

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

## Exercises

_Using case.csv & dept.csv:_

1. Read into spark environment (df_case, df_dept)

In [4]:
df_case = spark.read.format('csv').load('./sa311/case.csv', header=True)
df_dept = spark.read.format('csv').load('./sa311/dept.csv', header=True)

In [None]:
# or can use below to read any type of delimited file:
    
# df = spark.read.format("csv").\
#     option("sep", ",").\
#     option("header", True).\
#     option("inferSchema", True).\
#     load("sa311/source.csv")

In [5]:
df_dept.columns

['dept_division', 'dept_name', 'standardized_dept_name', 'dept_subject_to_SLA']

In [6]:
df_dept.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [7]:
df_case.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'SLA_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district']

In [8]:
df_case.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: string (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: string (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



2. Write df_case and df_dept back to disk into their own directories (my_cases and my_depts)

In [12]:
df_case.write.format('csv').mode('overwrite').\
    option('header','true').save('my_cases')

In [13]:
df_dept.write.format('csv').mode('overwrite').\
    option('header','true').save('my_depts')

3. Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet)

In [14]:
df_case.write.format('parquet').mode('overwrite').\
    option('header','true').save('my_cases_parquet')

In [15]:
df_dept.write.format('parquet').mode('overwrite').\
    option('header','true').save('my_depts_parquet')

4. Read your parquet files back into your spark environment.

In [16]:
df_case_pq = spark.read.format('parquet').load('./my_cases_parquet')

In [17]:
df_depts_pq = spark.read.format('parquet').load('./my_depts_parquet')

In [18]:
print(df_depts_pq)
print(df_case_pq)

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]
DataFrame[case_id: string, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: string, case_closed: string, dept_division: string, service_request_type: string, SLA_days: string, case_status: string, source_id: string, request_address: string, council_district: string]


5. Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)

In [22]:
import pandas as pd
cases_pdf = pd.read_csv('case.csv', sep=",")

In [23]:
cases_pdf.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [25]:
depts_pdf = pd.read_csv('dept.csv', sep=",")

In [27]:
depts_pdf.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


6. Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf)

## this question won't work, per Zach.  Go to the next question.

In [None]:
# cases_sdf = 
cases_sdf = spark.createDataFrame(cases_pdf)

In [None]:
cases_sdf.show(5)

In [None]:
depts_sdf = spark.createDataFrame(df_dept)

In [None]:
cases_sdf.show(5)
depts_sdf.show(5)

# df_spark = spark.createDataFrame(df_pd)
# df_spark.show(5)

7. Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)

8. Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables.

9. Explore the Hive database/tables you have created using the methods in the lesson.

10. Read from the tables into two spark dataframes (cases_sdf, depts_sdf)