In [0]:
import pyspark.sql.types as typ
from pyspark.sql.types import *
from pyspark import SparkContext
sc=SparkContext.getOrCreate()
import pandas as pd

# data2016FilePath ='dbfs:/FileStore/shared_uploads/deepali.zutshi@sjsu.edu/data2016.xlsx'
# data2017FilePath='dbfs:/FileStore/shared_uploads/deepali.zutshi@sjsu.edu/data2017.csv'
# data2018FilePath='dbfs:/FileStore/shared_uploads/deepali.zutshi@sjsu.edu/data2018.csv'

paths=['dbfs:/FileStore/shared_uploads/deepali.zutshi@sjsu.edu/data2016-2.csv','dbfs:/FileStore/shared_uploads/deepali.zutshi@sjsu.edu/data2017.csv','dbfs:/FileStore/shared_uploads/deepali.zutshi@sjsu.edu/data2018.csv']
data=spark.read.csv(paths,sep=',',inferSchema='True') #merge and create dataframe

#drop extra columns:
columns_to_drop = ['_c17', '_c18','_c19']
data= data.drop(*columns_to_drop)

#rename columns:
data.withColumnRenamed("_c0","ID")\
.withColumnRenamed("_c1","Name")\
.withColumnRenamed("_c2","Address")\
.withColumnRenamed("_c3","City")\
.withColumnRenamed("_c4","State")\
.withColumnRenamed("_c5","Zip")\
.withColumnRenamed("_c6","Latitude")\
.withColumnRenamed("_c7","Longitude")\
.withColumnRenamed("_c8","Summary")\
.withColumnRenamed("_c9","Serial")\
.withColumnRenamed("_c10","Number_Date")\
.withColumnRenamed("_c11","Date_Time")\
.withColumnRenamed("_c12","Score")\
.withColumnRenamed("_c13","Inspection_Type")\
.withColumnRenamed("_c14","Inspection_Time")\
.withColumnRenamed("c_15","Comment")\
.withColumnRenamed("c_16","Risk_Grade").show()
#data.printSchema()

#convert to integer:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType
data = data.withColumn("_c0",col("_c0").cast(IntegerType())) \
    .withColumn("_c5",col("_c5").cast(IntegerType())) \
    .withColumn("_c6",col("_c6").cast(FloatType()))\
    .withColumn("_c7",col("_c7").cast(FloatType())) \
    .withColumn("_c9",col("_c9").cast(FloatType())) \
    .withColumn("_c12",col("_c12").cast(IntegerType())) 

#descriptive statistics on columns
print("Descriptive Statistics: ")
cols=["_c6","_c7","_c9","_c12"]
description=data.describe(cols)
description.show()

#Correlation between different columns:
corr = []
corr.append(data.corr('_c6', '_c7'))
corr.append(data.corr('_c6', '_c9'))
corr.append(data.corr('_c6', '_c12'))
corr.append(data.corr('_c7', '_c9'))
corr.append(data.corr('_c7', '_c12'))
corr.append(data.corr('_c9', '_c12'))
print("Correlations between: [Latitude:Longitude],[Latitude:InspectionID],[Latitude:InspectionScore],[Longitude:InspectionID],[Longitude:InspectionScore],[InspectionID:InspectionScore]")
print(corr)

#checking and dropping duplicates
print('Count of rows: {0}'.format(data.count()))
print('Count of distinct rows: {0}'.format(data.distinct().count()))
data = data.dropDuplicates()

#Drop entries with duplicates
print('Count of ids: {0}'.format(data.count()))
print('Count of distinct restaurant ids: {0}'.format(data.select([c for c in data.columns if c != '_c0']).distinct().count()))
data = data.dropDuplicates(subset=[
c for c in data.columns if c != '_c0'
])

#missing data
print("Replacing 'null/na' with the mean values: ")
data.fillna( { '_c6':37.7434784244497, '_c7':4122.33581867157247 ,'_c9':41.415529404676109,'_c12':4.2170298040255946E8} ).show()

#restaurants in each zip code
from pyspark.sql.functions import *
print("Count of restaurants in each zip code: ")
restaurants = data.groupBy("_c5").agg(count("_c1").alias("Total Restaurants"))
restaurants.show()