In [1]:
__author__ = "Yasaman Emami"
__email__ = ['emami.yasamann@gmail.com','yasaman.emami@sjsu.edu']
__sid__ = "015325557"

## Reading data files as tsv and union all to make one dataframe

In [1]:
from pyspark.sql import SparkSession
  
spark = SparkSession.builder.getOrCreate()
data16 = spark.read.csv("dataset2016.tsv", sep=r'\t', header=False)
data17 = spark.read.csv("dataset2017.tsv", sep=r'\t', header=False)
data18 = spark.read.csv("dataset2018.tsv", sep=r'\t', header=False)
data = data16.union(data17).union(data18)

data.show(2)

21/10/22 22:37:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/22 22:38:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


+----+--------------------+---------------+-------------+---+-----+---------+-----------+--------------------+-----------+-------------+-------------------+----+--------------------+--------------------+--------------------+--------+
| _c0|                 _c1|            _c2|          _c3|_c4|  _c5|      _c6|        _c7|                 _c8|        _c9|         _c10|               _c11|_c12|                _c13|                _c14|                _c15|    _c16|
+----+--------------------+---------------+-------------+---+-----+---------+-----------+--------------------+-----------+-------------+-------------------+----+--------------------+--------------------+--------------------+--------+
|2764|            AL-HAMRA|   3083 16th St|San Francisco| CA|94103|37.764913|-122.421349|{'needs_recoding'...|       null|2764_20160106|2016-01-06T00:00:00|null|           Complaint|                null|                null|    null|
|1154|SUNFLOWER RESTAURANT|506 Valencia St|San Francisco| CA|941

In [3]:
data.count()

51731

In [4]:
#change the name of columns to names related to the values
data_with_header = data.selectExpr("_c0 as restaurant_id", "_c1 as restaurant_name","_c2 as restaurant_address",
                                "_c3 as restaurant_city","_c4 as restaurant_state","_c5 as restaurant_zipcode",
                                "_c6 as latitude","_c7 as longitude",
                                "_c8 as record_dict","_c9 as number","_c10 as id_start_date","_c11 as start_date",
                                "_c12 as rating","_c13 as schedule","_c14 as second_id","_c15 as required_inspect",
                                "_c16 as risk_level")
#data_col_name.schema.names

## show descriptive statistics for numerical columns

In [5]:
data_with_header.describe("latitude").show()

                                                                                

+-------+------------------+
|summary|          latitude|
+-------+------------------+
|  count|             29189|
|   mean|37.743478438658606|
| stddev|1.0368811059679102|
|    min|                 0|
|    max|         37.824494|
+-------+------------------+



In [6]:
data_with_header.describe("longitude").show()

+-------+-------------------+
|summary|          longitude|
+-------+-------------------+
|  count|              29189|
|   mean|-122.33581876076603|
| stddev|  3.360015099603407|
|    min|        -122.368257|
|    max|                  0|
+-------+-------------------+



In [7]:
data_with_header.describe("number").show()

+-------+--------------------+
|summary|              number|
+-------+--------------------+
|  count|               15967|
|   mean|1.415540227462923...|
| stddev|  1226179.3764345855|
|    min|         14150204876|
|    max|         14159881393|
+-------+--------------------+



In [8]:
data_with_header.describe("rating").show()

+-------+-----------------+
|summary|           rating|
+-------+-----------------+
|  count|            38200|
|   mean|85.94552356020942|
| stddev|8.780569238373296|
|    min|              100|
|    max|               98|
+-------+-----------------+



In [9]:
#creating new df from numerical column values
df_numerical_cols = data_with_header.selectExpr("cast(restaurant_id as int) restaurant_id",
                                                "cast(restaurant_zipcode as int) restaurant_zipcode",
                                                "cast(latitude as float) latitude",
                                                "cast(longitude as float) longitude",
                                                "cast(number as bigint) number",
                                                "cast(rating as float) rating",
                                               )

#df_numerical_cols.printSchema()
#df_numerical_cols.show()

## drop duplicates and fill null values for numerical columns with mean

In [10]:
from pyspark.sql import Row

#drop duplicate rows in df
data_without_duplicates = data_with_header.dropDuplicates()

#data_without_duplicates.count()

In [11]:
#showing null counts in each column
from pyspark.sql.functions import isnan, when, count, col

df_numerical_cols.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_numerical_cols.columns]).show()

+-------------+------------------+--------+---------+------+------+
|restaurant_id|restaurant_zipcode|latitude|longitude|number|rating|
+-------------+------------------+--------+---------+------+------+
|            0|              1256|   22542|    22542| 35764| 13531|
+-------------+------------------+--------+---------+------+------+



In [12]:
#replacing null zipcode with the avg value
zipcode_mean = df_numerical_cols.agg({'restaurant_zipcode': 'mean'}).take(1)[0][0]
df1 = df_numerical_cols.na.fill(zipcode_mean)

In [13]:
#replacing null latitude with the avg value
latitude_mean = df1.agg({'latitude': 'mean'}).take(1)[0][0]
df2 = df1.na.fill(latitude_mean)

In [14]:
#replacing null longitude with the avg value
longitude_mean = df2.agg({'longitude': 'mean'}).take(1)[0][0]
df3 = df2.na.fill(longitude_mean)

In [15]:
#replacing null number with the avg value
number_mean = df3.agg({'number': 'mean'}).take(1)[0][0]
df4 = df3.na.fill(number_mean)

In [16]:
#replacing null rating with the avg value
rating_mean = df4.agg({'rating': 'mean'}).take(1)[0][0]
df5 = df4.na.fill(rating_mean)

In [17]:
#checking if the null values has been filled
from pyspark.sql.functions import isnan, when, count, col

df5.select(*(count(when(col(c).isNull(), c)).alias(c) for c in df5.columns)).show()

+-------------+------------------+--------+---------+------+------+
|restaurant_id|restaurant_zipcode|latitude|longitude|number|rating|
+-------------+------------------+--------+---------+------+------+
|            0|                 0|       0|        0|     0|     0|
+-------------+------------------+--------+---------+------+------+



## display correlation matrix in two different ways

In [18]:
#finding correlation matrix approach 1
corr_matrix =[]

df_num_len = len(df_numerical_cols.columns)

for i in df_numerical_cols.columns:
    for j in df_numerical_cols.columns:
        corr_matrix.append(df_numerical_cols.corr(i,j,method="pearson"))

print(corr_matrix)


[1.0, -0.01948022584120647, -0.7971770861712276, 0.7971902932030105, 0.017755279693359438, -0.12813656738625975, -0.01948022584120647, 1.0, 0.0006860162285308216, -0.0006890228463521726, 0.009037958486648222, -0.0007724602081661724, -0.7971770861712277, 0.0006860162285308205, 1.0, -0.999999393525567, -0.1273698092054171, 0.13218872540238097, 0.7971902932030105, -0.0006890228463521711, -0.999999393525567, 1.0, 0.1271932569380599, -0.13218753277124973, 0.01775527969335945, 0.009037958486648219, -0.12736980920541716, 0.1271932569380599, 1.0, -0.013449288746996869, -0.12813656738625975, -0.0007724602081661749, 0.13218872540238097, -0.1321875327712497, -0.013449288746996885, 1.0]


In [19]:
#finding corr matrix approach2
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df5.columns, outputCol=vector_col)
df_vector = assembler.transform(df5).select(vector_col)

# get correlation matrix
matrix = Correlation.corr(df_vector, vector_col)
matrix.show(truncate =False)

21/10/13 15:17:18 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/10/13 15:17:18 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|pearson(corr_features)                                                                                                                                                                                                                                                                                               |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1.0                   -0.01919108505535854    ... (6 total)
-0.

In [20]:
clean_data_zipcode_query = data_with_header.selectExpr("cast(restaurant_id as int) restaurant_id",
                                                        "cast(restaurant_zipcode as int) restaurant_zipcode",
                                                        "cast(latitude as float) latitude",
                                                        "cast(longitude as float) longitude",
                                                        "cast(number as bigint) number",
                                                        "cast(rating as float) rating",
                                                        "cast(restaurant_name as string) restaurant_name")
orig_df1 = clean_data_zipcode_query.na.fill(zipcode_mean)
orig_df2 = orig_df1.na.fill(latitude_mean)
orig_df3 = orig_df2.na.fill(longitude_mean)
orig_df4 = orig_df3.na.fill(number_mean)
orig_df5 = orig_df4.na.fill(rating_mean)

## showing number of distinct restaurants in each zipcode

In [23]:
from pyspark.sql import SparkSession

sprk = SparkSession.builder.getOrCreate()

orig_df5.createOrReplaceTempView("final")
sqlCount3 = sprk.sql("SELECT  restaurant_zipcode, count(distinct restaurant_name) FROM final where restaurant_zipcode<99999 group by restaurant_zipcode")
sqlCount3.show()



+------------------+-------------------------------+
|restaurant_zipcode|count(DISTINCT restaurant_name)|
+------------------+-------------------------------+
|             94621|                              1|
|             94109|                            377|
|             94402|                              1|
|             94901|                              1|
|             94115|                            222|
|             95122|                              1|
|             94112|                            189|
|             94127|                             68|
|             94013|                              2|
|             94108|                            215|
|             94121|                            150|
|             94101|                              2|
|             94105|                            211|
|             94120|                              1|
|             94131|                             49|
|             92672|                          

                                                                                