## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
import pyspark

In [3]:
# File location and type
file_location = "/FileStore/tables/avocado.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files.                     For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
# display the data frame
display(df)

_c0,Date,AveragePrice,Total Volume,4046,4225,4770,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,2015-12-27T00:00:00.000+0000,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,2015-12-20T00:00:00.000+0000,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2015-12-13T00:00:00.000+0000,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,2015-12-06T00:00:00.000+0000,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,2015-11-29T00:00:00.000+0000,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
5,2015-11-22T00:00:00.000+0000,1.26,55979.78,1184.27,48067.99,43.61,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
6,2015-11-15T00:00:00.000+0000,0.99,83453.76,1368.92,73672.72,93.26,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
7,2015-11-08T00:00:00.000+0000,0.98,109428.33,703.75,101815.36,80.0,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
8,2015-11-01T00:00:00.000+0000,1.02,99811.42,1022.15,87315.57,85.34,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
9,2015-10-25T00:00:00.000+0000,1.07,74338.76,842.4,64757.44,113.0,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [4]:
# Create a view or table
temp_table_name = "avocado_csv"
df.createOrReplaceTempView(temp_table_name)

In [5]:
%sql

/* Query the created temp table in a SQL cell */

select * from `avocado_csv`

_c0,Date,AveragePrice,Total Volume,4046,4225,4770,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,2015-12-27T00:00:00.000+0000,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,2015-12-20T00:00:00.000+0000,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2015-12-13T00:00:00.000+0000,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,2015-12-06T00:00:00.000+0000,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,2015-11-29T00:00:00.000+0000,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
5,2015-11-22T00:00:00.000+0000,1.26,55979.78,1184.27,48067.99,43.61,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
6,2015-11-15T00:00:00.000+0000,0.99,83453.76,1368.92,73672.72,93.26,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
7,2015-11-08T00:00:00.000+0000,0.98,109428.33,703.75,101815.36,80.0,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
8,2015-11-01T00:00:00.000+0000,1.02,99811.42,1022.15,87315.57,85.34,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
9,2015-10-25T00:00:00.000+0000,1.07,74338.76,842.4,64757.44,113.0,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [6]:
import pandas as pd

pd_df = spark.sql("select * from `avocado_csv`").toPandas()
# checking for null values in the data
pd_df.isnull().sum()

In [7]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "avocado_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [8]:
# To display the columns in a file
df.columns

In [9]:
%sql
/* To display number of rows in a file */
select count(*) from avocado_csv

count(1)
18249


In [10]:
# Pre Processing of the data
# Removing the unnamed column
df = df.drop('_c0')

In [11]:
# Display the data frame
display(df)

Date,AveragePrice,Total Volume,4046,4225,4770,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
2015-12-27T00:00:00.000+0000,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
2015-12-20T00:00:00.000+0000,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2015-12-13T00:00:00.000+0000,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
2015-12-06T00:00:00.000+0000,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
2015-11-29T00:00:00.000+0000,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
2015-11-22T00:00:00.000+0000,1.26,55979.78,1184.27,48067.99,43.61,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
2015-11-15T00:00:00.000+0000,0.99,83453.76,1368.92,73672.72,93.26,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
2015-11-08T00:00:00.000+0000,0.98,109428.33,703.75,101815.36,80.0,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
2015-11-01T00:00:00.000+0000,1.02,99811.42,1022.15,87315.57,85.34,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
2015-10-25T00:00:00.000+0000,1.07,74338.76,842.4,64757.44,113.0,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [12]:
conv_df = spark.sql("select * from `avocado_csv` where type='conventional'")

display(conv_df)

_c0,Date,AveragePrice,Total Volume,4046,4225,4770,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,2015-12-27T00:00:00.000+0000,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,2015-12-20T00:00:00.000+0000,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2015-12-13T00:00:00.000+0000,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,2015-12-06T00:00:00.000+0000,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,2015-11-29T00:00:00.000+0000,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
5,2015-11-22T00:00:00.000+0000,1.26,55979.78,1184.27,48067.99,43.61,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
6,2015-11-15T00:00:00.000+0000,0.99,83453.76,1368.92,73672.72,93.26,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
7,2015-11-08T00:00:00.000+0000,0.98,109428.33,703.75,101815.36,80.0,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
8,2015-11-01T00:00:00.000+0000,1.02,99811.42,1022.15,87315.57,85.34,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
9,2015-10-25T00:00:00.000+0000,1.07,74338.76,842.4,64757.44,113.0,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [13]:
#Renaming the coumns and updating dataframe 
#pd_df = df.withColumnRenamed("4046", "Small Hass").withColumnRenamed("4225", "Large Hass").withColumnRenamed("4770", "XLarge Hass")
pd_df=pd_df.rename(index=str, columns={"4046" : "Small Hass", "4225" : "Large Hass","4770" : "XLarge Hass" })

In [14]:
display(pd_df)

_c0,Date,AveragePrice,Total Volume,Small Hass,Large Hass,XLarge Hass,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,2015-12-27T00:00:00.000+0000,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,2015-12-20T00:00:00.000+0000,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2015-12-13T00:00:00.000+0000,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,2015-12-06T00:00:00.000+0000,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,2015-11-29T00:00:00.000+0000,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
5,2015-11-22T00:00:00.000+0000,1.26,55979.78,1184.27,48067.99,43.61,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
6,2015-11-15T00:00:00.000+0000,0.99,83453.76,1368.92,73672.72,93.26,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
7,2015-11-08T00:00:00.000+0000,0.98,109428.33,703.75,101815.36,80.0,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
8,2015-11-01T00:00:00.000+0000,1.02,99811.42,1022.15,87315.57,85.34,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
9,2015-10-25T00:00:00.000+0000,1.07,74338.76,842.4,64757.44,113.0,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [15]:
# to dispaly the datatypes of the columns
df.dtypes

In [16]:
#convert Date column format from string to date
#from pyspark.sql.types import DateType
#df=df.withColumn("Date",df['Date'].cast(DateType()))
pd_df['Date'] =pd.to_datetime(pd_df.Date)

pd_df.sort_values(by=['Date'], inplace=False, ascending=True)

pd_df.head()

Unnamed: 0,_c0,Date,AveragePrice,Total Volume,Small Hass,Large Hass,XLarge Hass,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,0,2015-12-27,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,1,2015-12-20,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2,2015-12-13,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,3,2015-12-06,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,4,2015-11-29,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany


In [17]:
#Data types of a dataframe
df.dtypes

In [18]:
#schema of dataframe
df.printSchema()

In [19]:
#from pyspark.sql.types import IntegerType
#df = df.withColumn("Small Hass",df["Small Hass"].cast(IntegerType()))
#df = df.withColumn("XLarge Bags",df["XLarge Bags"].cast(IntegerType()))
#display(df)
#pdf=df.toPandas()

In [20]:
# Specifying dependent and independent variables
X = df.drop('AveragePrice')
Y= df['AveragePrice']

In [21]:
#drop date column
df=df.drop('Date','Total Bags','Total Volume').collect()

In [22]:
# To display the dataframe
display(df)

AveragePrice,4046,4225,4770,Small Bags,Large Bags,XLarge Bags,type,year,region
1.33,1036.74,54454.85,48.16,8603.62,93.25,0.0,conventional,2015,Albany
1.35,674.28,44638.81,58.33,9408.07,97.49,0.0,conventional,2015,Albany
0.93,794.7,109149.67,130.5,8042.21,103.14,0.0,conventional,2015,Albany
1.08,1132.0,71976.41,72.58,5677.4,133.76,0.0,conventional,2015,Albany
1.28,941.48,43838.39,75.78,5986.26,197.69,0.0,conventional,2015,Albany
1.26,1184.27,48067.99,43.61,6556.47,127.44,0.0,conventional,2015,Albany
0.99,1368.92,73672.72,93.26,8196.81,122.05,0.0,conventional,2015,Albany
0.98,703.75,101815.36,80.0,6266.85,562.37,0.0,conventional,2015,Albany
1.02,1022.15,87315.57,85.34,11104.53,283.83,0.0,conventional,2015,Albany
1.07,842.4,64757.44,113.0,8061.47,564.45,0.0,conventional,2015,Albany


In [23]:
type_counts = pd_df.groupby('type').size()
print(type_counts) 
#Types of avocados are also balanced since the ratio is almost 0.5

In [24]:
#To display pandas data frame
display(pd_df)

_c0,Date,AveragePrice,Total Volume,Small Hass,Large Hass,XLarge Hass,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,2015-12-27T00:00:00.000+0000,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,2015-12-20T00:00:00.000+0000,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2015-12-13T00:00:00.000+0000,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,2015-12-06T00:00:00.000+0000,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,2015-11-29T00:00:00.000+0000,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
5,2015-11-22T00:00:00.000+0000,1.26,55979.78,1184.27,48067.99,43.61,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
6,2015-11-15T00:00:00.000+0000,0.99,83453.76,1368.92,73672.72,93.26,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
7,2015-11-08T00:00:00.000+0000,0.98,109428.33,703.75,101815.36,80.0,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
8,2015-11-01T00:00:00.000+0000,1.02,99811.42,1022.15,87315.57,85.34,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
9,2015-10-25T00:00:00.000+0000,1.07,74338.76,842.4,64757.44,113.0,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [25]:
# The average prices of avocados by types; organic or not
avg_type = spark.sql("select AveragePrice,type from avocado_csv")
display(avg_type)

AveragePrice,type
1.33,conventional
1.35,conventional
0.93,conventional
1.08,conventional
1.28,conventional
1.26,conventional
0.99,conventional
0.98,conventional
1.02,conventional
1.07,conventional


In [26]:
# The average prices of avocados sold with respect to region ; 
avg_type_sr = spark.sql("select AveragePrice,region from avocado_csv")
display(avg_type_sr)

AveragePrice,region
1.33,Albany
1.35,Albany
0.93,Albany
1.08,Albany
1.28,Albany
1.26,Albany
0.99,Albany
0.98,Albany
1.02,Albany
1.07,Albany


In [27]:
# To discover price variation over regions
price_var=spark.sql("SELECT MAX(AveragePrice)-Min(AveragePrice) as pricevariation,region FROM  avocado_csv GROUP BY region")
display(price_var)

pricevariation,region
2.16,PhoenixTucson
1.96,GrandRapids
1.52,SouthCarolina
1.3299999999999998,TotalUS
2.41,WestTexNewMexico
1.73,Louisville
1.54,Philadelphia
1.96,Sacramento
1.25,DallasFtWorth
1.33,Indianapolis


In [28]:
# Displaying the pandas data frame
display(pd_df)

_c0,Date,AveragePrice,Total Volume,Small Hass,Large Hass,XLarge Hass,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,2015-12-27T00:00:00.000+0000,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,2015-12-20T00:00:00.000+0000,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2015-12-13T00:00:00.000+0000,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,2015-12-06T00:00:00.000+0000,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,2015-11-29T00:00:00.000+0000,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
5,2015-11-22T00:00:00.000+0000,1.26,55979.78,1184.27,48067.99,43.61,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
6,2015-11-15T00:00:00.000+0000,0.99,83453.76,1368.92,73672.72,93.26,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
7,2015-11-08T00:00:00.000+0000,0.98,109428.33,703.75,101815.36,80.0,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
8,2015-11-01T00:00:00.000+0000,1.02,99811.42,1022.15,87315.57,85.34,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
9,2015-10-25T00:00:00.000+0000,1.07,74338.76,842.4,64757.44,113.0,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [29]:
#%sql
#select region,AveragePrice from avocado_csv

In [30]:
#Dropping the Date column (date format is not suitable for next level analysis (i.e. OHE))
pd_df = pd_df.drop(['Date'], axis = 1)

In [31]:
#But before droping we'd better to see the correlation between those columns:
pd_df[['Small Hass', "Large Hass", "XLarge Hass",'Small Bags','Large Bags','XLarge Bags','Total Volume','Total Bags']].corr()

Unnamed: 0,Small Hass,Large Hass,XLarge Hass,Small Bags,Large Bags,XLarge Bags,Total Volume,Total Bags
Small Hass,1.0,0.92611,0.833389,0.92528,0.838645,0.699377,0.977863,0.920057
Large Hass,0.92611,1.0,0.887855,0.916031,0.810015,0.688809,0.974181,0.905787
XLarge Hass,0.833389,0.887855,1.0,0.802733,0.698471,0.679861,0.872202,0.792314
Small Bags,0.92528,0.916031,0.802733,1.0,0.902589,0.806845,0.967238,0.994335
Large Bags,0.838645,0.810015,0.698471,0.902589,1.0,0.710858,0.88064,0.943009
XLarge Bags,0.699377,0.688809,0.679861,0.806845,0.710858,1.0,0.747157,0.804233
Total Volume,0.977863,0.974181,0.872202,0.967238,0.88064,0.747157,1.0,0.963047
Total Bags,0.920057,0.905787,0.792314,0.994335,0.943009,0.804233,0.963047,1.0


In [32]:
# Drop the columns from pandas data frame and store them in a variable df_v
df_V = pd_df.drop(['_c0','AveragePrice', 'Total Volume', 'Total Bags'], axis = 1).groupby('year').agg('sum')
df_V

Unnamed: 0_level_0,Small Hass,Large Hass,XLarge Hass,Small Bags,Large Bags,XLarge Bags
year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2015,1709450000.0,1761054000.0,142772400.0,634682700.0,132066400.0,5443128.28
2016,1525123000.0,1672728000.0,159879800.0,1106494000.0,336626300.0,20038284.84
2017,1652038000.0,1544735000.0,91217510.0,1222953000.0,399339000.0,23997172.34
2018,460499700.0,407758700.0,22932590.0,360741400.0,123584000.0,7210591.87


In [33]:
display(pd_df)

_c0,AveragePrice,Total Volume,Small Hass,Large Hass,XLarge Hass,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
5,1.26,55979.78,1184.27,48067.99,43.61,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
6,0.99,83453.76,1368.92,73672.72,93.26,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
7,0.98,109428.33,703.75,101815.36,80.0,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
8,1.02,99811.42,1022.15,87315.57,85.34,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
9,1.07,74338.76,842.4,64757.44,113.0,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [34]:
# Total Bags = Small Bags + Large Bags + XLarge Bags
pd_df = pd_df.drop(['Total Bags'], axis = 1)

In [35]:
#Total Volume = Small Hass +Large Hass +XLarge Hass + Total Bags , to avoid multicollinearity I also drop Total Volume column.
pd_df = pd_df.drop(['Total Volume'], axis = 1)

In [36]:
#pd_df = pd_df.drop(['_c0',axis=1])
pd_df = pd_df.drop(['_c0'], axis = 1)

In [37]:
pd_df.info()

In [38]:
#Checking if the sample is balanced;
pd_df.groupby('region').size() # Approximately, there are 338 observations from each region, sample seems balanced.

In [39]:
len(pd_df.region.unique())

In [40]:
pd_df.region.unique() 

In [41]:
#basically we can remove states and work on cities rather than analysing both (to prevent multicollinerarity)

regionsToRemove = ['California', 'GreatLakes', 'Midsouth', 'NewYork', 'Northeast', 'SouthCarolina', 'Plains', 'SouthCentral', 'Southeast', 'TotalUS', 'West']
pd_df = pd_df[~pd_df.region.isin(regionsToRemove)]
len(pd_df.region.unique())

In [42]:
 # Standardizing (scaling) the variables
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
pd_df.loc[:,'Small Hass':'XLarge Bags']= scaler.fit_transform(pd_df.loc[:,'Small Hass':'XLarge Bags']) 
pd_df.head()

Unnamed: 0,AveragePrice,Small Hass,Large Hass,XLarge Hass,Small Bags,Large Bags,XLarge Bags,type,year,region
0,1.33,-0.403362,-0.193862,-0.34483,-0.357517,-0.369176,-0.231149,conventional,2015,Albany
1,1.35,-0.405153,-0.26495,-0.344323,-0.350996,-0.369077,-0.231149,conventional,2015,Albany
2,0.93,-0.404558,0.202237,-0.340723,-0.362067,-0.368945,-0.231149,conventional,2015,Albany
3,1.08,-0.402892,-0.066971,-0.343612,-0.381236,-0.368229,-0.231149,conventional,2015,Albany
4,1.28,-0.403833,-0.270746,-0.343452,-0.378732,-0.366733,-0.231149,conventional,2015,Albany


In [43]:
#scaling the data
#from sklearn.preprocessing import StandardScaler
#scaler = StandardScaler()
#pdf.loc[:,'Small Hass':'XLarge Bags']= scaler.fit_transform(pdf.loc[:,'Small Hass':'XLarge Bags'])
#pdf.head()
#data=spark.createDataFrame(pdf)
#display(data)

In [44]:
#Specifying dependent and independent variables
import numpy as np
X = pd_df.drop(['AveragePrice'], axis = 1)
y = pd_df['AveragePrice']
y=np.log1p(y)

In [45]:
#import numpy as np
#x = pdf.drop(['AveragePrice'], axis = 1)
#y = pdf['AveragePrice']
#y=np.log1p(y)

In [46]:
#Labeling the categorical variables
Xcat=pd.get_dummies(X[["type","region"]], drop_first = True)

In [47]:
# Numerical variable
Xnum=X[["Small Hass","Large Hass","XLarge Hass","Small Bags","Large Bags","XLarge Bags"]]

In [48]:
X= pd.concat([Xcat, Xnum], axis = 1) # Concatenate dummy categorcal variables and numeric variables
X.shape

In [49]:
F_DF = pd.concat([y,X],axis=1)
F_DF.head(3)

Unnamed: 0,AveragePrice,type_organic,region_Atlanta,region_BaltimoreWashington,region_Boise,region_Boston,region_BuffaloRochester,region_Charlotte,region_Chicago,region_CincinnatiDayton,region_Columbus,region_DallasFtWorth,region_Denver,region_Detroit,region_GrandRapids,region_HarrisburgScranton,region_HartfordSpringfield,region_Houston,region_Indianapolis,region_Jacksonville,region_LasVegas,region_LosAngeles,region_Louisville,region_MiamiFtLauderdale,region_Nashville,region_NewOrleansMobile,region_NorthernNewEngland,region_Orlando,region_Philadelphia,region_PhoenixTucson,region_Pittsburgh,region_Portland,region_RaleighGreensboro,region_RichmondNorfolk,region_Roanoke,region_Sacramento,region_SanDiego,region_SanFrancisco,region_Seattle,region_Spokane,region_StLouis,region_Syracuse,region_Tampa,region_WestTexNewMexico,Small Hass,Large Hass,XLarge Hass,Small Bags,Large Bags,XLarge Bags
0,0.845868,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,-0.403362,-0.193862,-0.34483,-0.357517,-0.369176,-0.231149
1,0.854415,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,-0.405153,-0.26495,-0.344323,-0.350996,-0.369077,-0.231149
2,0.65752,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,-0.404558,0.202237,-0.340723,-0.362067,-0.368945,-0.231149


In [50]:
print(pd_df['Small Hass'])

In [51]:
print(pd_df['XLarge Bags'])

In [52]:
pd_df.info()

In [53]:
data = spark.createDataFrame(F_DF)

from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['Small Hass','Large Hass','XLarge Hass','Small Bags','Large Bags','XLarge Bags'], outputCol = 'features')
data= vectorAssembler.transform(data)
data = data.selectExpr('features','AveragePrice as label')

display(data)

features,label
"List(1, 6, List(), List(-0.4033623150128266, -0.19386203152037398, -0.3448301825594494, -0.3575165856337041, -0.36917615187617087, -0.23114861172709608))",0.8458682675776092
"List(1, 6, List(), List(-0.4051534194973697, -0.264949562917268, -0.3443228592245013, -0.3509959350654252, -0.36907697646849313, -0.23114861172709608))",0.8544153281560676
"List(1, 6, List(), List(-0.4045583612784526, 0.20223657178579285, -0.34072270926548376, -0.3620672207701793, -0.36894482055967726, -0.23114861172709608))",0.6575200029167942
"List(1, 6, List(), List(-0.402891585516613, -0.06697130505404313, -0.3436120079439692, -0.3812357205779629, -0.3682286057051743, -0.23114861172709608))",0.7323678937132266
"List(1, 6, List(), List(-0.40383304450904023, -0.27074618577951204, -0.34345237818272695, -0.3787321863191294, -0.36673325574931626, -0.23114861172709608))",0.8241754429663495
"List(1, 6, List(), List(-0.4026332921010442, -0.24011552180541423, -0.3450571561262158, -0.3741102208016475, -0.36837643319963737, -0.23114861172709608))",0.8153648132841945
"List(1, 6, List(), List(-0.4017208398495658, -0.05468666782926969, -0.3425804006119409, -0.3608140756674931, -0.36850250759760506, -0.23114861172709608))",0.688134638736401
"List(1, 6, List(), List(-0.405007792809847, 0.14912167082781452, -0.34324186643508864, -0.37645780090928, -0.35820323507197677, -0.23114861172709608))",0.6830968447064438
"List(1, 6, List(), List(-0.4034344118357298, 0.044114531811126616, -0.34297548427101554, -0.3372448966378506, -0.364718404424467, -0.23114861172709608))",0.7030975114131134
"List(1, 6, List(), List(-0.4043226506237247, -0.11925091643788387, -0.34159568452227757, -0.3619111045044759, -0.3581545829851915, -0.23114861172709608))",0.7275486072772778


In [54]:
final_data=data.select('features','label')

In [55]:
final_data.head()
type(final_data)

In [56]:
# split the data into test and training test
splits = final_data.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

In [57]:
# implement linear regression on the data
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

In [58]:
# display the predictions
train_data ,test_data = final_data.randomSplit([0.80,0.20]) 
print(train_data.count())
print(test_data.count())
train_data.describe().show() 

In [59]:
from pyspark.ml.regression import LinearRegression 
model=LinearRegression(featuresCol='features',labelCol='label')

In [60]:
trained_model=model.fit(train_data)  
overall_results=trained_model.evaluate(train_data)


In [61]:
# calculate the r-squared error
print('R-squared Error :',overall_results.r2)

In [62]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features',labelCol='label', maxIter=10,regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

In [63]:
lr_predictions1 = lr_model.transform(train_data)
lr_predictions1.select("prediction","label","features").show(10)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="r2")
display(lr_predictions1) # to display predictions of the data using linear regression

features,label,prediction
"List(1, 6, List(), List(-0.40596550941347026, -0.1389663285311798, -0.31026435313444733, -0.33186277621914034, -0.36952116066467244, -0.23114861172709608))",0.8960880245566356,0.8646952866355297
"List(1, 6, List(), List(-0.4058186967603892, -0.09094514419246132, -0.3275333002428397, -0.3437850009464041, -0.3679530664828998, -0.23114861172709608))",0.8458682675776092,0.8646952866355297
"List(1, 6, List(), List(-0.4055964270486561, 0.14293774706020484, -0.3288277978379139, -0.35336061825304904, -0.36879488792684373, -0.23114861172709608))",0.7514160886839212,0.8646952866355297
"List(1, 6, List(), List(-0.4053748985653971, -0.257747199988821, -0.3381791087886887, -0.3544166321209531, -0.3674887665200689, -0.23114861172709608))",0.688134638736401,0.8646952866355297
"List(1, 6, List(), List(-0.40527957658364233, 0.07767008527024863, -0.29901394686789323, -0.3213819925889703, -0.36828942081365595, -0.23114861172709608))",0.7466879474879752,0.8646952866355297
"List(1, 6, List(), List(-0.4052402420592894, 0.1303320730154078, -0.3427270604550823, -0.2463759894718768, -0.36059256712251486, -0.23114861172709608))",0.7275486072772778,0.8646952866355297
"List(1, 6, List(), List(-0.4052030818051269, -0.1714775165534509, -0.1146365842030111, -0.21397927050639737, -0.3631631749771809, -0.23114861172709608))",0.8628899551470399,0.8646952866355297
"List(1, 6, List(), List(-0.4051634507893843, 0.09754547262531582, -0.1348432177612654, -0.230366452855677, -0.3688196817787631, -0.2226628752381756))",0.7178397931503169,0.8646952866355297
"List(1, 6, List(), List(-0.4051534194973697, -0.264949562917268, -0.3443228592245013, -0.3509959350654252, -0.36907697646849313, -0.23114861172709608))",0.8544153281560676,0.8646952866355297
"List(1, 6, List(), List(-0.40512381977364176, -0.07131359418750001, -0.34430440203335766, -0.19411983868986224, -0.35747930137678197, -0.23114861172709608))",0.7839015438284094,0.8646952866355297


In [64]:
# calculate the rmse value
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

In [65]:
print('R-squared Error :%g' % trainingSummary.r2)

In [66]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="rmse")
test_result = lr_model.evaluate(test_data)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

In [67]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="rmse")
test_result = lr_model.evaluate(train_data)
print("Root Mean Squared Error (RMSE) on train data = %g" % test_result.rootMeanSquaredError)

In [68]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
               labelCol="label",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions1))

In [69]:
# Implement the decision tree algorithm on that data
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'label')
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
dt_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse) #calculate the root mean squared error value

In [70]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'label')
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
dt_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
display(dt_model) # predict the data model

treeNode
"{""index"":31,""featureType"":""continuous"",""prediction"":null,""threshold"":-0.33381382425417633,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":15,""featureType"":""continuous"",""prediction"":null,""threshold"":-0.3279433913734907,""categories"":null,""feature"":4,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":-0.3499846242374268,""categories"":null,""feature"":0,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":-0.5744632033393906,""categories"":null,""feature"":1,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":-0.40909278766449725,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":1.003840712392561,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":0.9428856853469054,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":-0.36932971002508697,""categories"":null,""feature"":4,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":1.0737173964768656,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":0.9827234235062082,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [71]:
# predict the values using desicion tree
display(dt_predictions)

features,label,prediction
"List(1, 6, List(), List(-0.4046599095793889, -0.3424191585599094, -0.34488056570284154, -0.3169932287830297, -0.3550085706425838, -0.23114861172709608))",0.7839015438284094,0.8218158480211756
"List(1, 6, List(), List(-0.4046358443616001, -0.16819096276001297, -0.3403081707292577, 0.1562814174790084, -0.37135730913229104, -0.23114861172709608))",0.8671004876833832,0.8218158480211756
"List(1, 6, List(), List(-0.4043226506237247, -0.11925091643788387, -0.34159568452227757, -0.3619111045044759, -0.3581545829851915, -0.23114861172709608))",0.7275486072772778,0.9827234235062082
"List(1, 6, List(), List(-0.4039681457522323, -0.35980852020828374, -0.3404598190024379, -0.3328146315193243, -0.3650428296377902, -0.23114861172709608))",0.7747271675523681,0.8218158480211756
"List(1, 6, List(), List(-0.4034365366906885, -0.2047252135898524, -0.34428045756917125, 0.2621930103695364, -0.3591975620956515, -0.23114861172709608))",0.8837675401685949,0.8218158480211756
"List(1, 6, List(), List(-0.4031071347568485, -0.3343471066687593, -0.33970008110752536, -0.35430234139684, -0.3624834491877669, -0.23114861172709608))",0.7275486072772778,0.9827234235062082
"List(1, 6, List(), List(-0.40285333812735524, -0.18057713249587895, -0.329813511613585, -0.2322969123489225, -0.37135730913229104, -0.23114861172709608))",0.904218150639886,0.8218158480211756
"List(1, 6, List(), List(-0.4019566987499885, -0.1971018745651281, -0.3435147335582122, 0.04214526954390356, -0.35404839364136415, -0.23114861172709608))",0.9162907318741552,0.8218158480211756
"List(1, 6, List(), List(-0.40177791444206284, -0.30497596725451936, -0.33908899842776974, -0.34528115568802, -0.35946468076444393, -0.23114861172709608))",0.688134638736401,0.9827234235062082
"List(1, 6, List(), List(-0.4017208398495658, -0.05468666782926969, -0.3425804006119409, -0.3608140756674931, -0.36850250759760506, -0.23114861172709608))",0.688134638736401,0.9827234235062082


In [72]:
from pyspark.ml.regression import LinearRegression
dt = LinearRegression(featuresCol ='features', labelCol = 'label')
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
dt_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [73]:
# Calculating the r2 value using linera regression
dt_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = dt_evaluator.evaluate(dt_predictions)
print("R2 = %f" % r2)