## Spark SQL - Running Example

**Report By** : OUMOUSS EL MEHDI (M2 Data & Knowledge)

**Paper - Spark SQL: Relational Data Processing in Spark**
<p> https://pdfs.semanticscholar.org/f845/6b259bbf137ba89db548d77ab6643a2e40b2.pdf

![About Spark](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/spark_about.png)

**The SparkSession, which is going to be our access point to the Spark Framework:**

In [5]:
spark

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) **DataFrames API**

**1. Using Semi-structed DataSet (Json) : **

In [8]:
df = spark.read.json('/FileStore/tables/rj2rp8ke1488295662051/moviepeople_1000-1c188.json')

In [9]:
df.show(5)

In [10]:
df.printSchema()

Notice that the above cell takes 0.07 seconds to infer the schema by sampling the file and reading through it.

Inferring the schema works for ad hoc analysis against smaller datasets. But when working on multi-TB+ data, it's better to provide an **explicit pre-defined schema manually**, so there's no inferring cost.

In [12]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

Count how many rows total there are in DataFrame (and see how long it takes to do a full scan from remote disk):

In [14]:
df.count()

**Q. show the first 5 people in the dataSet**

In [16]:
df.select('person-name').take(5)

**Q. The person named Anabela Teixeira**

In [18]:
df.filter(df['person-name']=='Teixeira, Anabela').show()

**Q. The birthplace of Steven Spielberg**

In [20]:
df.filter(df['person-name']=='Spielberg, Steven').select('info.birthdate', 'info.birthnotes').show()

**2. Using CSV File : Fire Safety Complaints**

Information on Complaints received by the Fire Department (from the public) for a particular location. Key fields include Complaint Number, Complaint Type, Address, Disposition

Dataset : https://data.sfgov.org/Housing-and-Buildings/Fire-Safety-Complaints/2wsq-7wmv 

API Doc: https://dev.socrata.com/foundry/data.sfgov.org/v3w9-dyka

In [22]:
df_csv = spark.read.load('/FileStore/tables/9rlemeo91488310038961/Fire_Safety_Complaints.csv',format='com.databricks.spark.csv', header='true', inferSchema='true')

In [23]:
df_csv.printSchema()

In [24]:
df_csv.columns

In [25]:
df_csv = df_csv\
.withColumnRenamed("Neighborhood  District", "NeighborhoodDistrict")\
.withColumnRenamed("Complaint Item Type Description", "ComplaintItemTypeDescription")\


**Q. How many complaints of each complaints type were there?**

In [27]:
display(df_csv.select('ComplaintItemTypeDescription').groupBy('ComplaintItemTypeDescription').count().orderBy("count", ascending=False))

The SF Fire department receive complaints from alarm systems more than any other type. 

Note that the above command took about 2.15 seconds to execute. 
In an upcoming section, we'll cache the data into memory for up to 100x speed increases.

In [29]:
from pyspark.sql.functions import *
from_pattern = 'dd/MM/yyyy'
to_pattern = 'yyyy-MM-dd'

df_csv = df_csv \
  .withColumn('ReceivedDate', unix_timestamp(df_csv['Received Date'], from_pattern).cast("timestamp")) \
  .drop('Received Date') \
  .withColumn('EntryDate', unix_timestamp(df_csv['Entry Date'], from_pattern).cast("timestamp")) \
  .drop('Entry Date')

In [30]:
display(df_csv)

In [31]:
df_csv.printSchema()

**Q. How many years of Fire Complaints is in the data file?**

In [33]:
from pyspark.sql.functions import *

In [34]:
df_csv.select(year('ReceivedDate')).distinct().orderBy('year(ReceivedDate)').show()

**Q-4) How many complaints were received before Feb. 27th 2017 ?**

Note above that Feb 27th, 2017 is the 58th day of the year.

In [37]:
display(df_csv.filter(year('ReceivedDate') == '2017').filter(dayofyear('ReceivedDate') <= 58).groupBy(dayofyear('ReceivedDate')).count().orderBy('dayofyear(ReceivedDate)'))

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** Memory and Caching **

The DataFrame is currently comprised of 2 partitions:

In [40]:
df_temp = df_csv
df_temp = df_temp.select('*', year('ReceivedDate').alias('ReceivedYear'))

In [41]:
df_temp.rdd.getNumPartitions()

In [42]:
df_temp.createOrReplaceTempView("df_VIEW");
df_temp.repartition(6).createOrReplaceTempView("df_VIEW");
spark.catalog.cacheTable("df_VIEW")

In [43]:
spark.table("df_VIEW").count()

In [44]:
fireComplaintsDF = spark.table("df_VIEW")
fireComplaintsDF.count()

Note that the full scan + count took less time (0.12 s compared to 0.78 seconds)

In [46]:
spark.catalog.isCached("df_VIEW")

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) **SQL Queries**

In [48]:
%sql SELECT count(*) FROM df_VIEW;

In [49]:
%sql SELECT NeighborhoodDistrict, ReceivedYear FROM df_VIEW

**Q. Which neighborhood in SF generated the most calls this year?**

In [51]:
%sql SELECT NeighborhoodDistrict, count(NeighborhoodDistrict) AS Neighborhood_Count FROM df_VIEW WHERE ReceivedYear == '2017' GROUP BY NeighborhoodDistrict ORDER BY Neighborhood_Count DESC LIMIT 15

SQL also has some handy commands like `DESC` (describe) to see the schema + data types for the table:

In [53]:
%sql DESC df_VIEW;

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** Spark Internals and SQL UI**

![Catalyst](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/catalyst.png)

In [56]:
# Note that a SQL Query just returns back a DataFrame
spark.sql("SELECT NeighborhoodDistrict, count(NeighborhoodDistrict) AS Neighborhood_Count FROM df_VIEW WHERE ReceivedYear == '2017' GROUP BY NeighborhoodDistrict ORDER BY Neighborhood_Count DESC LIMIT 15")

The `explain()` method can be called on a DataFrame to understand its logical + physical plans:

In [58]:
spark.sql("SELECT NeighborhoodDistrict, count(NeighborhoodDistrict) AS Neighborhood_Count FROM df_VIEW WHERE ReceivedYear == '2017' GROUP BY NeighborhoodDistrict ORDER BY Neighborhood_Count DESC LIMIT 15").explain(True)

We can view the visual representation of the SQL Query plan from the Spark UI.

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** Spark MLlib **

####1. Loading data

In [62]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.util import MLUtils

# Load data file 
data = MLUtils.loadLibSVMFile(sc, '/FileStore/tables/23phx7tu1475934034017/sample_libsvm_data.txt')
        # Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])

####2. Training

In [64]:
# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=5, maxBins=32)

####3. Prediction & Evaluation

In [66]:
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

**References : **

1) Spark 2.0 preview docs: https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/

2) DataFrame user documentation: https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/sql-programming-guide.html

3) PySpark API 2.0 docs: https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/python/index.html

4) Spark SQL : http://spark.apache.org/docs/latest/sql-programming-guide.html - http://spark.apache.org/sql/