# Practicing with Apache Spark

Apache Spark is a data processing framework that can quickly perform processing tasks on very large data sets. 
It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. 
It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, 
pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, 
and Structured Streaming for incremental computation and stream processing.

Here are some examples of using Apache Spark.

## 1 Run Apache Spark

There are two ways to run Apache Spark: in local mode (my computer) or in global mode (a cloud, IBM Watson Studio default runtime). I run this program in a local mode because IBM Watson Studio is not free.
    

In [26]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('You are running in a IBM Watson Studio Apache Spark Notebook')
else:
    printmd('You are running in an IBM Watson Studio Default Runtime')


# <span style="color:red">You are running in a IBM Watson Studio Apache Spark Notebook.</span>

PySpark is the Python API for Spark. Install PySpark if necessary.

In [28]:
#!pip install pyspark==3.3.0

In [29]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [30]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

## 2 Work with spark.sql

We calculate the minimal temperature for the test data set.

In [31]:
def minTemperature():
    #some reference: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
    return spark.sql("SELECT min(temperature) as mintemp from washing").first().mintemp

We calculate the mean of the temperature for the test data set.

In [33]:
def meanTemperature():
    #some reference: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
    return spark.sql("SELECT mean(temperature) as meantemp from washing").first().meantemp

We calculate the maximum of the temperature for the test data set.

In [34]:
def maxTemperature():
    #some reference: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
    return spark.sql("SELECT max(temperature) as maxtemp from washing").first().maxtemp

We calculate the standard deviation of the temperature for the test data set.

In [35]:
def sdTemperature():
    #some reference: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
    #https://spark.apache.org/docs/2.3.0/api/sql/
    return spark.sql("SELECT stddev_pop(temperature) as sdtemp from washing").first().sdtemp

We calculate the skewness of the temperature. Skewness is a measure of asymmetry or distortion of symmetric distribution.

In [36]:
def skewTemperature():    
    return spark.sql("""
SELECT skewness(temperature) as sktemperature from washing
                    """).first().sktemperature

We calculate the kurtosis of the temperature. Kurtosis is a measure of the combined weight of a distribution's tails relative to the center of the distribution.

In [37]:
def kurtosisTemperature():    
        return spark.sql("""
SELECT kurtosis(temperature) as ktemperature from washing
                    """ ).first().ktemperature


We calculate the correlation between the temperature and the hardness.

In [39]:
def correlationTemperatureHardness():
    #some reference: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
    #https://spark.apache.org/docs/2.3.0/api/sql/
    return spark.sql("SELECT corr(temperature,hardness) as temperaturehardness from washing").first().temperaturehardness

## 3 Create a dataframe (using spark)

Now it is time to grab a PARQUET file and create a dataframe out of it. Using SparkSQL we can handle it like a database. 

In [40]:
df = spark.read.parquet('washing.parquet')
df.createOrReplaceTempView('washing')
df.show()

+--------------------+--------------------+-----+--------+----------+---------+--------+-----+-----------+-------------+-------+
|                 _id|                _rev|count|flowrate|fluidlevel|frequency|hardness|speed|temperature|           ts|voltage|
+--------------------+--------------------+-----+--------+----------+---------+--------+-----+-----------+-------------+-------+
|0d86485d0f88d1f9d...|1-57940679fb8a713...|    4|      11|acceptable|     null|      77| null|        100|1547808723923|   null|
|0d86485d0f88d1f9d...|1-15ff3a0b304d789...|    2|    null|      null|     null|    null| 1046|       null|1547808729917|   null|
|0d86485d0f88d1f9d...|1-97c2742b68c7b07...|    4|    null|      null|       71|    null| null|       null|1547808731918|    236|
|0d86485d0f88d1f9d...|1-eefb903dbe45746...|   19|      11|acceptable|     null|      75| null|         86|1547808738999|   null|
|0d86485d0f88d1f9d...|1-5f68b4c72813c25...|    7|    null|      null|       75|    null| null|   

## Test spark.sql functions

Now let's test the functions we've implemented

In [53]:
min_temperature = 0
mean_temperature = 0
max_temperature = 0
sd_temperature = 0
skew_temperature = 0
kurtosis_temperature = 0
correlation_temperature = 0


In [54]:
min_temperature = minTemperature()
print("Min temperature is", min_temperature)

Min temperature is 80


In [55]:
mean_temperature = meanTemperature()
print('Mean temperature is', mean_temperature)

Mean temperature is 90.03800298062593


In [56]:
max_temperature = maxTemperature()
print('Max temperature is', max_temperature)

Max temperature is 100


In [57]:
sd_temperature = sdTemperature()
print('Std temperature is', sd_temperature)

Std temperature is 6.098487624200337


In [58]:
skew_temperature = skewTemperature()
print('Skew temperature is', skew_temperature)

Skew temperature is 0.010410008042945581


In [59]:
kurtosis_temperature = kurtosisTemperature()
print('Kurtosis temperature is', kurtosis_temperature)

Kurtosis temperature is -1.2239269305786886


In [60]:
correlation_temperature = correlationTemperatureHardness()
print('Correlation between temperature and hardness is', correlation_temperature)

Correlation between temperature and hardness is 0.017754069047296324
