## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/weatherHistory.csv"
file_type = "csv"

mainDf = spark.read.csv(file_location, header=True, inferSchema=True)

In [0]:
df = mainDf
df.show()

+-------------------+-------------+-----------+------------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|     Formatted Date|      Summary|Precip Type|   Temperature (C)|Apparent Temperature (C)|Humidity| Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Loud Cover|Pressure (millibars)|       Daily Summary|
+-------------------+-------------+-----------+------------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|2006-03-31 22:00:00|Partly Cloudy|       rain| 9.472222222222221|      7.3888888888888875|    0.89|           14.1197|                 251.0|15.826300000000002|       0.0|             1015.13|Partly cloudy thr...|
|2006-03-31 23:00:00|Partly Cloudy|       rain| 9.355555555555558|       7.227777777777776|    0.86|           14.2646|                 259.

In [0]:
df.printSchema()

root
 |-- Formatted Date: timestamp (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Precip Type: string (nullable = true)
 |-- Temperature (C): double (nullable = true)
 |-- Apparent Temperature (C): double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed (km/h): double (nullable = true)
 |-- Wind Bearing (degrees): double (nullable = true)
 |-- Visibility (km): double (nullable = true)
 |-- Loud Cover: double (nullable = true)
 |-- Pressure (millibars): double (nullable = true)
 |-- Daily Summary: string (nullable = true)



In [0]:
df.columns

Out[126]: ['Formatted Date',
 'Summary',
 'Precip Type',
 'Temperature (C)',
 'Apparent Temperature (C)',
 'Humidity',
 'Wind Speed (km/h)',
 'Wind Bearing (degrees)',
 'Visibility (km)',
 'Loud Cover',
 'Pressure (millibars)',
 'Daily Summary']

In [0]:
df = df.drop("Formatted Date")
df = df.drop("Loud Cover")
df = df.drop("Daily Summary")

In [0]:
df.show()

+-------------+-----------+------------------+------------------------+--------+------------------+----------------------+------------------+--------------------+
|      Summary|Precip Type|   Temperature (C)|Apparent Temperature (C)|Humidity| Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Pressure (millibars)|
+-------------+-----------+------------------+------------------------+--------+------------------+----------------------+------------------+--------------------+
|Partly Cloudy|       rain| 9.472222222222221|      7.3888888888888875|    0.89|           14.1197|                 251.0|15.826300000000002|             1015.13|
|Partly Cloudy|       rain| 9.355555555555558|       7.227777777777776|    0.86|           14.2646|                 259.0|15.826300000000002|             1015.63|
|Mostly Cloudy|       rain| 9.377777777777778|       9.377777777777778|    0.89|3.9284000000000003|                 204.0|           14.9569|             1015.94|
|Partly Cloudy|       

In [0]:
df.count()

Out[129]: 96453

In [0]:
df.printSchema()

root
 |-- Summary: string (nullable = true)
 |-- Precip Type: string (nullable = true)
 |-- Temperature (C): double (nullable = true)
 |-- Apparent Temperature (C): double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed (km/h): double (nullable = true)
 |-- Wind Bearing (degrees): double (nullable = true)
 |-- Visibility (km): double (nullable = true)
 |-- Pressure (millibars): double (nullable = true)



In [0]:
df.columns

Out[131]: ['Summary',
 'Precip Type',
 'Temperature (C)',
 'Apparent Temperature (C)',
 'Humidity',
 'Wind Speed (km/h)',
 'Wind Bearing (degrees)',
 'Visibility (km)',
 'Pressure (millibars)']

In [0]:
###missing values will be replaced by mean
from pyspark.ml.feature import Imputer
imputer=Imputer(
inputCols=[
 'Apparent Temperature (C)',
 'Humidity',
 'Wind Speed (km/h)',
 'Wind Bearing (degrees)',
 'Visibility (km)',
 'Pressure (millibars)'],
outputCols=[
 'Apparent Temperature (C)_imputed',
 'Humidity_imputed',
 'Wind Speed (km/h)_imputed',
 'Wind Bearing (degrees)_imputed',
 'Visibility (km)_imputed',
 'Pressure (millibars)_imputed']
).setStrategy("mean")

In [0]:
#ADD imputation cols to df
df = imputer.fit(df).transform(df)

In [0]:
df.printSchema()

root
 |-- Summary: string (nullable = true)
 |-- Precip Type: string (nullable = true)
 |-- Temperature (C): double (nullable = true)
 |-- Apparent Temperature (C): double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed (km/h): double (nullable = true)
 |-- Wind Bearing (degrees): double (nullable = true)
 |-- Visibility (km): double (nullable = true)
 |-- Pressure (millibars): double (nullable = true)
 |-- Apparent Temperature (C)_imputed: double (nullable = true)
 |-- Humidity_imputed: double (nullable = true)
 |-- Wind Speed (km/h)_imputed: double (nullable = true)
 |-- Wind Bearing (degrees)_imputed: double (nullable = true)
 |-- Visibility (km)_imputed: double (nullable = true)
 |-- Pressure (millibars)_imputed: double (nullable = true)



In [0]:
df = df.na.drop(how="all")

In [0]:
df.count()

Out[136]: 96453

In [0]:
from pyspark.ml.feature import (VectorAssembler,OneHotEncoder,
                                StringIndexer)

In [0]:
Summary_indexer = StringIndexer(inputCol='Summary', outputCol='SummaryIndex')
df = Summary_indexer.fit(df).transform(df)
# Summary_encoder = OneHotEncoder(inputCol='SummaryIndex', outputCol='SummaryVec')
# df = Summary_encoder.fit(df).transform(df)

In [0]:
Precip_Type_indexer = StringIndexer(inputCol='Precip Type', outputCol='Precip_Type_Index')
df = Precip_Type_indexer.fit(df).transform(df)
# Precip_Type_encoder = OneHotEncoder(inputCol='Precip_Type_Index', outputCol='Precip_Type_Vec')
# df = Precip_Type_encoder.fit(df).transform(df)

In [0]:
df.columns

Out[140]: ['Summary',
 'Precip Type',
 'Temperature (C)',
 'Apparent Temperature (C)',
 'Humidity',
 'Wind Speed (km/h)',
 'Wind Bearing (degrees)',
 'Visibility (km)',
 'Pressure (millibars)',
 'Apparent Temperature (C)_imputed',
 'Humidity_imputed',
 'Wind Speed (km/h)_imputed',
 'Wind Bearing (degrees)_imputed',
 'Visibility (km)_imputed',
 'Pressure (millibars)_imputed',
 'SummaryIndex',
 'Precip_Type_Index']

In [0]:
featureassembler = VectorAssembler(inputCols=['Apparent Temperature (C)_imputed',
 'Humidity_imputed',
 'Wind Speed (km/h)_imputed',
 'Wind Bearing (degrees)_imputed',
 'Visibility (km)_imputed',
 'Pressure (millibars)_imputed','SummaryIndex','Precip_Type_Index'],outputCol='features')

In [0]:
output=featureassembler.transform(df)

In [0]:
finalized_output=output.select("features","Temperature (C)")

In [0]:
finalized_output.show()

+--------------------+------------------+
|            features|   Temperature (C)|
+--------------------+------------------+
|[7.38888888888888...| 9.472222222222221|
|[7.22777777777777...| 9.355555555555558|
|[9.37777777777777...| 9.377777777777778|
|[5.94444444444444...|  8.28888888888889|
|[6.97777777777777...| 8.755555555555553|
|[7.11111111111111...| 9.222222222222221|
|[5.52222222222222...| 7.733333333333334|
|[6.52777777777777...|  8.77222222222222|
|[10.8222222222222...| 10.82222222222222|
|[13.7722222222222...| 13.77222222222222|
|[16.0166666666666...|16.016666666666666|
|[17.1444444444444...|17.144444444444446|
|[17.8000000000000...|17.800000000000004|
|[17.3333333333333...|17.333333333333332|
|[18.8777777777777...| 18.87777777777778|
|[18.9111111111111...|18.911111111111115|
|[15.3888888888888...| 15.38888888888889|
|[15.5500000000000...|15.550000000000002|
|[14.2555555555555...|14.255555555555553|
|[13.1444444444444...|13.144444444444442|
+--------------------+------------

In [0]:
##now we will do train test split
from pyspark.ml.regression import LinearRegression
train_data,test_data=finalized_output.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='features', labelCol='Temperature (C)')
regressor=regressor.fit(train_data)

In [0]:
###coefficients
regressor.coefficients

Out[146]: DenseVector([0.8724, -1.3864, 0.0823, -0.0004, 0.004, -0.0002, -0.0345, 0.0011])

In [0]:
##intercepts
regressor.intercept

Out[147]: 2.9168210689111556

In [0]:
##prediction
pred_results=regressor.evaluate(test_data)

In [0]:
pred_results.predictions.show()

+--------------------+-------------------+-------------------+
|            features|    Temperature (C)|         prediction|
+--------------------+-------------------+-------------------+
|(8,[0,1,4,5],[4.9...|  4.972222222222224|  5.735330013060961|
|(8,[0,1,4,5],[6.0...|  6.038888888888888|  7.063212602276785|
|(8,[0,1,4,5],[18....|  18.33888888888889|  17.87690384846665|
|(8,[0,1,4,5],[19....| 19.400000000000002|  18.40985114001645|
|(8,[0,1,4,6],[7.7...|  7.777777777777778|  8.234204739390227|
|(8,[0,1,5,6],[2.8...|  2.844444444444443|  3.625886130558225|
|[-21.555555555555...| -12.11111111111111|-14.937994131934225|
|[-21.399999999999...|-12.666666666666666|-15.245928331614401|
|[-21.255555555555...|-11.555555555555555|-14.433730809203432|
|[-19.927777777777...| -12.63888888888889| -14.48954916291752|
|[-19.505555555555...|-12.105555555555554|-14.020812351896538|
|[-19.005555555555...|-14.033333333333331|-14.518550990085593|
|[-17.694444444444...|              -13.4|-13.393602535

In [0]:
##now check how the model is performed
pred_results.meanAbsoluteError,pred_results.meanSquaredError

Out[150]: (0.7437343123513911, 0.9045644878384197)