## Spark and SparkSQL in Python
In this note, I demonstrate Apache Spark and SparkSQL in Python, using PySpark library.

Apache Spark is an open-source library for processing large-scale data. It provides an interfrace for handling data over a cluster of processing machines so that the data can be processed in parallel and fault-tolerant manner. As in many other parallel data processing scheme, Spark also uses an algorithm called MapReduce. Yet, Spark has an advantage over schemes using the naive MapReduce algorithm. This adavantage comes from that Spark uses a data structure called Resilient Distributed Datasets (RDD). When the task requires multiple iterations of MapReduce procedures, the data should be stored to and read from stable storage system such as HDFS (Hadoop Distributed File System) for each iteration, and it takes the most of computing time. RDD is the data structure to store these intermediate results over the distributed memory (RAM) intead of the disk. Since the data processing is much faster on RAM than the disk drive, this RDD file system have speeded up the parallel data processing significantly.

As Spark continuosly improves, Spark dataframe is introduced to deal with more structured data. While Spark dataframe is also a dataset distributed over memory as RDD, it is organized along with columns as in tables of relational database. For further convenient handing of data in Spark dataframe, Spark provides a wrapper called SparkSQL. SparkSQL allows the data in Spark dataframe can be handled with SQL-like commands.

### Data processing with PySpark
I load PySpark here. I also Pandas for comparision.

In [1]:
import pyspark
import pandas as pd

For demonstration, I use [credit card fraud dataset](https://github.com/GuansongPang/anomaly-detection-datasets) which I used in my [anomaly detection note](anomaly_detection_PCA.ipynb). This is the dataset of 284807 data with 29 features ("V1"~"V28","amount") and a label ("class") presenting if the data is fraudulent (1 being fraud, 0 being not). I take a sample of the dataset with Pandas.

In [2]:
df_pd=pd.read_csv("fraud_normalized.csv")
print("Dataset size:",len(df_pd))
df_pd.head()

Dataset size: 284807


Unnamed: 0,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,class
0,0.935192,0.76649,0.881365,0.313023,0.763439,0.267669,0.266815,0.786444,0.475312,0.5106,...,0.561184,0.522992,0.663793,0.391253,0.585122,0.394557,0.418976,0.312697,0.005824,0
1,0.978542,0.770067,0.840298,0.271796,0.76612,0.262192,0.264875,0.786298,0.453981,0.505267,...,0.55784,0.480237,0.666938,0.33644,0.58729,0.446013,0.416345,0.313423,0.000105,0
2,0.935217,0.753118,0.868141,0.268766,0.762329,0.281122,0.270177,0.788042,0.410603,0.513018,...,0.565477,0.54603,0.678939,0.289354,0.559515,0.402727,0.415489,0.311911,0.014739,0
3,0.941878,0.765304,0.868484,0.213661,0.765647,0.275559,0.266803,0.789434,0.414999,0.507585,...,0.559734,0.510277,0.662607,0.223826,0.614245,0.389197,0.417669,0.314371,0.004807,0
4,0.938617,0.77652,0.864251,0.269796,0.762975,0.263984,0.268968,0.782484,0.49095,0.524303,...,0.561327,0.547271,0.663392,0.40127,0.566343,0.507497,0.420561,0.31749,0.002724,0


To read this dataset and convert it into a RDD file, I first construct the Spark context. While Spark needs multiple machines to demonstrate its power, here I use only my local machine.

Now suppose I am interested in summing up the "V4" feature for fradulent data (i.e., "class"=1). However the data are stored in the format of string by reading the file directly using 'textFile'. To deal with this file, I have to parse the necessary data. To do this, I proceed:
* map: split the string into different fields using that the file is CSV.
* filter: remove header from the data since it is not numeric.
* filter: collect fradulent data only.
* map: Take V4 feature and converting themm to numeric data.

Up to this part, I obtain "V4" features of 492 fradulent data in a RDD file. To sum the data up, I use reduce function with summation.


In [3]:
sc = pyspark.SparkContext("local[*]")

rdd_txt=sc.textFile("fraud_normalized.csv")
anomaly=rdd_txt.map(lambda r: r.split(",")) \
            .filter(lambda r: len(r[29])<4) \
            .filter(lambda r: int(r[29])>0) \
            .map(lambda r: float(r[3]))
print(anomaly.count())
anomaly_sum = anomaly.reduce(lambda v1,v2:v1+v2)
print(anomaly_sum)

492
223.01106799999997


### SparkSQL with PySpark
While one can read the file in string format with PySparc context, it is inconvenient to manually parse the dataset from string data. Instead, one can use Spark dataframe.

To create Spark dataframe with PySpark, one need to use the module for SparkSQL. To acess this module, I construct SQL context from PySpark context contructed previously. With this SQL context, I read the CSV file directly to the Spark dataframe. Note that 'inferSchema' option enables to load data in numeric form instead of string and 'header' option allows to interpret the first row of the dataset as the header, rather than a line of data. I present first 5 rows of the dataset, which is identical with the sample shown with Pandas.

In [4]:
sqlc=pyspark.sql.SQLContext(sc)
df=sqlc.read.csv("fraud_normalized.csv",inferSchema=True,header=True)
df.show(5)

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----+
|      V1|      V2|      V3|      V4|      V5|      V6|      V7|      V8|      V9|     V10|     V11|     V12|     V13|     V14|     V15|     V16|     V17|     V18|     V19|     V20|     V21|     V22|     V23|     V24|     V25|     V26|     V27|     V28|  Amount|class|
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----+
|0.935192| 0.76649|0.881365|0.313023|0.763439|0.267669|0.266815|0.786444|0.475312|  0.5106|0.252484|0.680908|0.371591|0.635591|0.446084|0.434392|0.737173|0.655066|0.594863|0.582942|0.561184|0.5

Spark SQL allows the MapReduce procedures on Spark dataframes to be performed with SQL-like interface. It even allows to make a SQL query on the dataframes in string format. Here I repeat the previous task of summing "V4" up for fradulent data. I use 'GROUP BY' to sum the data along each class, and it gives the same result as before.

In [5]:
df.registerTempTable("table")

V4=sqlc.sql("""
    SELECT class, COUNT(V4) AS count, SUM(V4) AS total_V4
    FROM table
    GROUP BY class
""")
V4.show()

+-----+------+------------------+
|class| count|          total_V4|
+-----+------+------------------+
|    1|   492|223.01106799999997|
|    0|284315| 71528.47105600013|
+-----+------+------------------+



### References
The explanation for concepts of Apache Spark, RDD, Spark dataframe is based on [this](https://www.tutorialspoint.com/apache_spark/apache_spark_rdd.htm) and [this](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html) references. Documentation of Spary Python API can be found in [here](https://spark.apache.org/docs/latest/api/python/index.html).