In [1]:
displayHTML("<font size=8 color='green'>Introduction to Spark Data Frames and SQL using PySpark</font>")

### [MSTC](http://mstc.ssr.upm.es/big-data-track) and MUIT:

## Sources:
* [Databriks: introduction-to-dataframes-python](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html)
* [Introduction to Spark with Python, by Jose A. Dianes](http://jadianes.github.io/spark-py-notebooks)
* [Complete Guide on DataFrame Operations in PySpark](https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/)
* [Understanding-DataFrames](https://github.com/awantik/pyspark-tutorial/wiki/Understanding-DataFrames)
* [From Pandas to Spark Dataframes](https://github.com/awantik/pyspark-tutorial/wiki/Migrating-from-Pandas-to-Apache-Spark%E2%80%99s-DataFrame)
* [Also ML](https://www.analyticsvidhya.com/blog/2016/09/comprehensive-introduction-to-apache-spark-rdds-dataframes-using-pyspark/)

## This notebook will introduce Spark capabilities to deal with data in a structured way.
* ### Basically, everything turns around the concept of *Data Frame* and using *SQL language* to query them.</font>")

## In Apache Spark, a DataFrame is a **distributed collection of rows under named columns**.
- ### In simple terms, it is same as a table in relational database or an Excel sheet with Column headers.

## It also shares some common characteristics with RDD:<br>

*    **Immutable** in nature : We can create DataFrame / RDD once but can’t change it. And we can transform a DataFrame / RDD after applying transformations.
*    **Lazy Evaluations**: Which means that a task is not executed until an action is performed.
*    **Distributed**: RDD and DataFrame both are distributed in nature.

### PERFORMANCE:

![How to create a DataFrame](https://camo.githubusercontent.com/cc93c064c6fd754df0209d42ec054998edd81fa0/68747470733a2f2f7777772e736166617269626f6f6b736f6e6c696e652e636f6d2f6c6962726172792f766965772f6c6561726e696e672d7079737061726b2f393738313738363436333730382f67726170686963732f4230353739335f30335f30332e6a7067)

## How to create a DataFrame ?
 
 ![How to create a DataFrame](https://www.analyticsvidhya.com/wp-content/uploads/2016/10/DataFrame-in-Spark.png)

* ### A Spark `DataFrame` is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case.

## <font color=#AA1B5A> DataFrame RDD of Row objects

From: http://www.cs.sfu.ca/CourseCentral/732/ggbaker/content/spark-sql.html

### Think of a DataFrame being implemented with an RDD of Row objects.
- ### Row is a generic row object with an ordered collection of field
- ### Nicest way to create Rows: create a custom subclass for your data

In [12]:
from pyspark.sql import Row

NameAge = Row('fname lname', 'age') # build a Row subclass

user1 = NameAge('John Smith', 47)
user2 = NameAge('Jane Smith', 22)
user3 = NameAge('Frank Jones', 28)

data_rows = [ user1, user2, user3 ]

print(data_rows)

In [13]:
df1 = spark.createDataFrame(data_rows)

df1.show()

In [14]:
# Databricks DISPLAY
display(df1)

fname lname,age
John Smith,47
Jane Smith,22
Frank Jones,28


## TO DO: create another DataFrame df2 with sames users but with their weights:

fname lname|  weight

- 'John Smith' 80.5
- 'Jane Smith' 62.3
- 'Frank Jones' 71.5

In [16]:
???

In [17]:
NameWeight = Row('fname lname', 'weight') # build a Row subclass

df2 =  spark.createDataFrame([NameWeight('John Smith', 80.5), 
                              NameWeight('Jane Smith', 62.3),
                              NameWeight('Frank Jones', 71.5)])

display(df2)

fname lname,weight
John Smith,80.5
Jane Smith,62.3
Frank Jones,71.5


## TO DO: Join both DataFrames into df

In [19]:
df = df1.join(df2, "fname lname")

display(df)

fname lname,age,weight
Frank Jones,28,71.5
John Smith,47,80.5
Jane Smith,22,62.3


## We can apply functions to Columns using `pyspark.sql.functions` or our own Used-Definded Functions (UDF)

### for example:

- 1.- `select(\*cols)` : Projects a set of expressions and returns a new DataFrame.<br>
- 2.- apply `split` function to the "fname lname" column : split fname and lname
- 3.- `alias` returns this column aliased with a new name or names (in the case of expressions that return more than one column, such as explode)

In [21]:
import pyspark.sql.functions as f

df_new= df.select(f.split(df['fname lname'],' ').alias('sep names'))

df_new.show()

- ## `explode(col)`: this function returns a new row for each element in the given array or map.

In [23]:
import pyspark.sql.functions as f

df_new = df.select(f.explode(f.split(df['fname lname'],' ')).alias('all'))

df_new.show()

# Creating a Data Frame from CSV file

## <font color=#F01B5A>We will read our Orange Churn dataset

In [26]:
# File location and type
file_location = "/FileStore/tables/churn_bigml_80-bf1a8.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

State,Account length,Area code,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total day charge,Total eve minutes,Total eve calls,Total eve charge,Total night minutes,Total night calls,Total night charge,Total intl minutes,Total intl calls,Total intl charge,Customer service calls,Churn
KS,128,415,No,Yes,25,265.1,110,45.07,197.4,99,16.78,244.7,91,11.01,10.0,3,2.7,1,False
OH,107,415,No,Yes,26,161.6,123,27.47,195.5,103,16.62,254.4,103,11.45,13.7,3,3.7,1,False
NJ,137,415,No,No,0,243.4,114,41.38,121.2,110,10.3,162.6,104,7.32,12.2,5,3.29,0,False
OH,84,408,Yes,No,0,299.4,71,50.9,61.9,88,5.26,196.9,89,8.86,6.6,7,1.78,2,False
OK,75,415,Yes,No,0,166.7,113,28.34,148.3,122,12.61,186.9,121,8.41,10.1,3,2.73,3,False
AL,118,510,Yes,No,0,223.4,98,37.98,220.6,101,18.75,203.9,118,9.18,6.3,6,1.7,0,False
MA,121,510,No,Yes,24,218.2,88,37.09,348.5,108,29.62,212.6,118,9.57,7.5,7,2.03,3,False
MO,147,415,Yes,No,0,157.0,79,26.69,103.1,94,8.76,211.8,96,9.53,7.1,6,1.92,0,False
WV,141,415,Yes,Yes,37,258.6,84,43.96,222.0,111,18.87,326.4,97,14.69,11.2,5,3.02,0,False
RI,74,415,No,No,0,187.7,127,31.91,163.4,148,13.89,196.0,94,8.82,9.1,5,2.46,0,False


In [27]:
type(df)

In [28]:
df.printSchema()

In [29]:
display(df.describe())

summary,State,Account length,Area code,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total day charge,Total eve minutes,Total eve calls,Total eve charge,Total night minutes,Total night calls,Total night charge,Total intl minutes,Total intl calls,Total intl charge,Customer service calls
count,2666,2666.0,2666.0,2666,2666,2666.0,2666.0,2666.0,2666.0,2666.0,2666.0,2666.0,2666.0,2666.0,2666.0,2666.0,2666.0,2666.0,2666.0
mean,,100.62040510127532,437.4388597149288,,,8.021755438859715,179.48162040510135,100.31020255063764,30.512404351087813,200.3861590397601,100.02363090772693,17.033072018004518,201.16894223555968,100.10615153788449,9.052689422355604,10.23702175543886,4.467366841710428,2.764489872468112,1.562640660165041
stddev,,39.56397365334985,42.52101801942717,,,13.61227701829193,54.21035022086982,19.988162186059512,9.215732907163495,50.95151511764598,20.16144511531889,4.330864176799864,50.780323368725206,19.418458551101697,2.2851195129157564,2.7883485770512566,2.456194903012946,0.7528120531228477,1.3112357589949093
min,AK,1.0,408.0,No,No,0.0,0.0,0.0,0.0,0.0,0.0,0.0,43.7,33.0,1.97,0.0,0.0,0.0,0.0
max,WY,243.0,510.0,Yes,Yes,50.0,350.8,160.0,59.64,363.7,170.0,30.91,395.0,166.0,17.77,20.0,20.0,5.4,9.0


In [30]:
# Convert to a Date type
df = df.withColumn('Voice mail plan', f.regexp_replace(df['Voice mail plan'],'Yes','1'))

In [31]:
display(df)

State,Account length,Area code,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total day charge,Total eve minutes,Total eve calls,Total eve charge,Total night minutes,Total night calls,Total night charge,Total intl minutes,Total intl calls,Total intl charge,Customer service calls,Churn
KS,128,415,No,1,25,265.1,110,45.07,197.4,99,16.78,244.7,91,11.01,10.0,3,2.7,1,False
OH,107,415,No,1,26,161.6,123,27.47,195.5,103,16.62,254.4,103,11.45,13.7,3,3.7,1,False
NJ,137,415,No,No,0,243.4,114,41.38,121.2,110,10.3,162.6,104,7.32,12.2,5,3.29,0,False
OH,84,408,Yes,No,0,299.4,71,50.9,61.9,88,5.26,196.9,89,8.86,6.6,7,1.78,2,False
OK,75,415,Yes,No,0,166.7,113,28.34,148.3,122,12.61,186.9,121,8.41,10.1,3,2.73,3,False
AL,118,510,Yes,No,0,223.4,98,37.98,220.6,101,18.75,203.9,118,9.18,6.3,6,1.7,0,False
MA,121,510,No,1,24,218.2,88,37.09,348.5,108,29.62,212.6,118,9.57,7.5,7,2.03,3,False
MO,147,415,Yes,No,0,157.0,79,26.69,103.1,94,8.76,211.8,96,9.53,7.1,6,1.92,0,False
WV,141,415,Yes,1,37,258.6,84,43.96,222.0,111,18.87,326.4,97,14.69,11.2,5,3.02,0,False
RI,74,415,No,No,0,187.7,127,31.91,163.4,148,13.89,196.0,94,8.82,9.1,5,2.46,0,False


In [32]:
df.count()

In [33]:
df.columns

## `groupby`:
* ### How to find Churn vs no_Churn cases?

In [35]:
df.groupby('Churn').count().show()

In [36]:
df.crosstab('State', 'Churn').show()

In [37]:
dc=df.groupBy("State").agg(f.count("Churn").alias('Num Churn'))

In [38]:
dc.show()

## Use `filter()` to return the rows that match a predicate

In [40]:
filterDF = df.filter( df.State == "CA" )
#filterDF = df.filter( (df.State == "CA") & (df.Churn == 'False') )
#filterDF = df.filter( (df.State == "CA") & (df['Total day calls'] >  90) )

display(filterDF)

State,Account length,Area code,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total day charge,Total eve minutes,Total eve calls,Total eve charge,Total night minutes,Total night calls,Total night charge,Total intl minutes,Total intl calls,Total intl charge,Customer service calls,Churn
CA,116,415,No,1,34,268.6,83,45.66,178.2,142,15.15,166.3,106,7.48,11.6,3,3.13,2,False
CA,151,415,Yes,No,0,218.0,57,37.06,114.4,88,9.72,269.2,95,12.11,12.4,1,3.35,0,True
CA,93,415,No,1,36,178.7,134,30.38,178.6,102,15.18,126.8,82,5.71,8.0,4,2.16,2,False
CA,96,510,No,1,31,183.4,126,31.18,195.5,106,16.62,180.1,93,8.1,10.5,5,2.84,1,False
CA,113,415,No,No,0,187.6,97,31.89,208.2,118,17.7,158.9,101,7.15,8.7,6,2.35,2,False
CA,112,415,No,No,0,111.9,92,19.02,114.0,143,9.69,146.8,79,6.61,14.1,3,3.81,5,True
CA,60,415,Yes,No,0,183.0,110,31.11,206.7,93,17.57,203.8,119,9.17,11.1,6,3.0,1,False
CA,72,408,No,1,39,92.8,98,15.78,271.2,115,23.05,167.1,83,7.52,5.8,7,1.57,1,False
CA,92,408,No,No,0,249.4,118,42.4,211.5,95,17.98,169.0,116,7.61,9.1,3,2.46,0,False
CA,103,415,No,1,18,149.9,84,25.48,170.9,84,14.53,171.5,112,7.72,11.5,7,3.11,0,True


In [41]:
filterDF.count()

In [42]:
countDistinctDF = df.select("State", "Churn")\
  .groupBy("State")\
  .agg(f.countDistinct("Churn"))

In [43]:
countDistinctDF.show()

# Spark SQL schema

## For using Spark SQL we need the schema in our data.

In [46]:
df.printSchema()

## COLUMNS?

## <font color=#F81B5A>...worth mentioning PARQUET

![Parquet](https://parquet.apache.org/assets/img/parquet_logo.png)
https://parquet.apache.org/

### Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.

## Before SQL Note that you can also convert freely between Pandas DataFrame and Spark DataFrame</font>

In [49]:
import pandas as pd

In [50]:
pd.DataFrame(df.take(5), columns=df.columns)

## or...

In [52]:
df.toPandas().head(5)

In [53]:
CV_data.groupby('Churn').agg({'Customer service calls': 'mean'}).show()

### <font color=#F81BA0 size=5>TO DO:</font>

- ### How to find the mean of 'Customer service calls' in every state

In [55]:
???

In [56]:
df.groupby('State').agg({'Total day minutes': 'mean', 'Customer service calls': 'mean'}).toPandas()

In [57]:
CV_data.groupby('State').agg({'Total day minutes': 'mean', 'Customer service calls': 'mean'}).toPandas()

# <font color=#F81B5A>SQL Syntax

## There is also a spark.sql function where you can do the same things with SQL query syntax.

### Apply SQL Queries on DataFrame

* ### <font color=brown>To apply SQL queries on DataFrame first we need to register DataFrame as table. Let’s first register train DataFrame as table.

In [60]:
df.registerTempTable('df_table')

In [61]:
Mean_DayMin_ServiceCalls = sqlContext.sql("""
    SELECT State, MEAN(`Total day minutes`), MEAN(`Customer service calls`) 
    FROM df_table GROUP BY State
""")

In [62]:
type(Mean_DayMin_ServiceCalls)

In [63]:
Mean_DayMin_ServiceCalls.show()

In [64]:
Mean_DayMin_ServiceCalls.toPandas()

### <font color=red>...NOW order: descend by average Day Minutes

In [66]:
Day_min = sqlContext.sql("""
    SELECT State, MEAN(`Total day minutes`) as average_DayMin, MEAN(`Customer service calls`) 
    FROM df_table GROUP BY State order by average_DayMin desc
""")

In [67]:
pd.DataFrame(Day_min.take(5))

## <font color=#F81B5A>... same as before but using SQL-like methods:

In [69]:
import pyspark.sql.functions as f

Day_min2=df.groupby('State').agg(f.mean('Total day minutes').alias("average_DayMin")
                            , f.mean('Customer service calls')) \
                            .orderBy(f.desc("average_DayMin"))

In [70]:
pd.DataFrame(Day_min2.take(5))

### <font color=brownUDFs> We can register a user defined function (UDF) from Python

In [72]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction

binary_map = {'Yes':1.0, 'No':0.0, 'True':1.0, 'False':0.0}

toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

In [73]:
pd.DataFrame(df.take(5), columns=df.columns)

In [74]:
df = df.withColumn('Churn', toNum(df['Churn'])) \
    .withColumn('International plan', toNum(df['International plan'])) \
    .withColumn('Voice mail plan', toNum(df['Voice mail plan']))

### <font color=red>...NOTE that you MUST assign CV_data = ... to a NEW dataFrame

In [76]:
df = df.drop('Voice mail plan2')

In [77]:
df.columns

In [78]:
pd.DataFrame(df.take(5), columns=df.columns)

## `sample`:
- ###   How to create a sample DataFrame from the base DataFrame?

### The sample method on DataFrame will return a DataFrame containing the sample of base DataFrame. The sample method will take 3 parameters.

- ### withReplacement = True or False to select a observation with or without replacement. fraction = x, where x = .5 shows that we want to have 50% data in sample DataFrame;  seed for reproduce the result

### Let’s create the two DataFrame t1 and t2 from train, both will have 20% sample of train and count the number of rows in each.

In [80]:
t1 = df.sample(False, 0.5, 42)

In [81]:
t1.count()

## `appy`: apply map operation on DataFrame columns

We can apply a function on each row of DataFrame using map operation. After applying this function, we get the result in the form of RDD. Let’s apply a map operation on User_ID column of train and print the first 5 elements of mapped RDD(x,1) after applying the function (I am applying lambda function).

## RETURN TO: Notebook with Word Count Example

# AND Follow HandySpark: bringing pandas-like capabilities to Spark DataFrames

https://towardsdatascience.com/handyspark-bringing-pandas-like-capabilities-to-spark-dataframes-5f1bcea9039e