<div style="float:left;font-size:20px;">
    <h1>PySpark</h1>
</div><div style="float:right;"><img src="../assets/banner.jpg"></div>

<hr>

In [1]:
import pyspark
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark import sql

from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf()#.setAppName('Test App').setMaster('Local')
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = sql.SQLContext(sc)

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Creating a Spark DataFrame from an RDD

In [3]:
data = sc.parallelize([(1, 'Cat', 4, 10),
                       (2, 'Dog', 4, 9),
                       (3, 'Parrot', 2, 3)])

data_df = spark.createDataFrame(data, ['ID', 'Animal', 'Legs', 'Popularity'])

In [4]:
data_df.show()

+---+------+----+----------+
| ID|Animal|Legs|Popularity|
+---+------+----+----------+
|  1|   Cat|   4|        10|
|  2|   Dog|   4|         9|
|  3|Parrot|   2|         3|
+---+------+----+----------+



In [4]:
# DataFrames are built of Row objects
data_df.take(1)

[Row(ID=1, Animal='Cat', Legs=4, Popularity=10)]

## Creating a Spark DataFrame from JSON

In [7]:
#json_df = spark.read.json('test.json')

## Adding a CSV table to a Spark DataFrame

Execute the following in a `pyspark` console:

In [6]:
data_csv = spark.read.csv("file:///home/tau/iris.csv", header=True, inferSchema=True)

In [7]:
# View the contents:
data_csv.show(5)

+---+-----------------+----------------+-----------------+----------------+------+
|_c0|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|
+---+-----------------+----------------+-----------------+----------------+------+
|  0|              5.1|             3.5|              1.4|             0.2|   0.0|
|  1|              4.9|             3.0|              1.4|             0.2|   0.0|
|  2|              4.7|             3.2|              1.3|             0.2|   0.0|
|  3|              4.6|             3.1|              1.5|             0.2|   0.0|
|  4|              5.0|             3.6|              1.4|             0.2|   0.0|
+---+-----------------+----------------+-----------------+----------------+------+
only showing top 5 rows



In [15]:
# Show the schema
data_csv.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- sepal length (cm): double (nullable = true)
 |-- sepal width (cm): double (nullable = true)
 |-- petal length (cm): double (nullable = true)
 |-- petal width (cm): double (nullable = true)
 |-- target: double (nullable = true)



In [20]:
# WithColumn returns a new dataframe with the added column
data_df2 = data_csv.withColumn("petal area (cm2)", data_csv["petal length (cm)"] * data_csv["petal width (cm)"])
data_df2.show(5)

+---+-----------------+----------------+-----------------+----------------+------+-------------------+
|_c0|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|   petal area (cm2)|
+---+-----------------+----------------+-----------------+----------------+------+-------------------+
|  0|              5.1|             3.5|              1.4|             0.2|   0.0|0.27999999999999997|
|  1|              4.9|             3.0|              1.4|             0.2|   0.0|0.27999999999999997|
|  2|              4.7|             3.2|              1.3|             0.2|   0.0|               0.26|
|  3|              4.6|             3.1|              1.5|             0.2|   0.0|0.30000000000000004|
|  4|              5.0|             3.6|              1.4|             0.2|   0.0|0.27999999999999997|
+---+-----------------+----------------+-----------------+----------------+------+-------------------+
only showing top 5 rows



## Example Monte Carlo to compute Pi

In [5]:
from random import random
from operator import add
def throw_darts(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

n = 10000
count = sc.parallelize(range(1, n + 1), 1).map(throw_darts).reduce(add)

print(f'Pi ~ {4* count/n}')

Pi ~ 3.1704


## Read book and perform map-reduce operation

Check the execution plan in: http://localhost:4040/stages

In [18]:
counts = sc.textFile("file:///home/tau/Downloads/2554-0.txt")\
            .flatMap(lambda line: line.split(" "))\
            .map(lambda word: (word, 1))\
            .reduceByKey(lambda a, b: a + b)\

counts.saveAsTextFile("hdfs:///user/tau/2554-0_wordCount.txt")

## Filter an RDD

In [26]:
%%timeit
arr = range(10000)
rdd = sc.parallelize(arr, 4)
odds = rdd.filter(lambda x: x % 2 != 0)
odds.take(5)

41.6 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


## Using SciPy

In [11]:
import pyspark.sql.functions as f
import pandas as pd
from scipy import stats

# Create a DataFrame with the column 'val' filled with 1e6 random numbers
big_df = (spark.range(0, 1000000)
               .withColumn('val', f.rand()))

big_df.cache()
big_df.show(3)

+---+------------------+
| id|               val|
+---+------------------+
|  0|0.9453879644531373|
|  1|0.7138517107833131|
|  2|0.6439455957156267|
+---+------------------+
only showing top 3 rows



In [12]:
# Return the gaussian PDF - Python UDF (User defined function)
@f.pandas_udf('double', f.PandasUDFType.SCALAR)
def pandas_pdf(v):
    return pd.Series(stats.norm.pdf(v))

big_df.withColumn('probability', pandas_pdf(big_df.val)
                 ).show(5)




+---+------------------+-------------------+
| id|               val|        probability|
+---+------------------+-------------------+
|  0|0.9453879644531373|0.25517192766594166|
|  1|0.7138517107833131|0.30921122310314325|
|  2|0.6439455957156267|  0.324239934612677|
|  3|0.7710043804297324|0.29636531582975634|
|  4|0.6429033620151852| 0.3244574424054605|
+---+------------------+-------------------+
only showing top 5 rows



In [13]:
# Slow method
def test_pandas_pdf():
    return (big_df
           .withColumn('probability', pandas_pdf(big_df.val))
           .agg(f.count(f.col('probability')))
           #.show()
           )

%timeit -n 1 test_pandas_pdf()

27.7 ms ± 14.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [14]:
# Fast method - Vectorised UDF (User defined function)
@f.udf('double')
def pdf(v):
    return float(stats.norm.pdf.pdf(v))

def test_pdf():
    return (big_df
           .withColumn('probability', pdf(big_df.val))
           .agg(f.count(f.col('probability')))
           #.show()
           )

%timeit -n 1 test_pdf()

21.8 ms ± 4.26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Temporary tables

In [18]:
data_df.createTempView('data_df_view')  # Create a new temporary table with specified name
spark.sql('''SELECT Animal FROM data_df_view''').show()

+------+
|Animal|
+------+
|   Cat|
|   Dog|
|Parrot|
+------+



In [27]:
data_df.createOrReplaceTempView('data_df_view')  # Create or replace a new temporary table with specified name
spark.sql('''SELECT Legs, COUNT(*) AS AnimalCount FROM data_df_view
             GROUP BY Legs''').show()

+----+-----------+
|Legs|AnimalCount|
+----+-----------+
|   2|          1|
|   4|          2|
+----+-----------+



### Other operations

In [30]:
data_df.select('Animal', 'Legs').show()

+------+----+
|Animal|Legs|
+------+----+
|   Cat|   4|
|   Dog|   4|
|Parrot|   2|
+------+----+



In [32]:
data_df.filter(data_df.Legs == 4).show()

+---+------+----+----------+
| ID|Animal|Legs|Popularity|
+---+------+----+----------+
|  1|   Cat|   4|        10|
|  2|   Dog|   4|         9|
+---+------+----+----------+



In [33]:
data_df.groupBy('Legs').count().show()

+----+-----+
|Legs|count|
+----+-----+
|   2|    1|
|   4|    2|
+----+-----+



In [34]:
df = data_df.toPandas()
type(df)

pandas.core.frame.DataFrame

## Hive

To Install HIVE:
- Download HIVE: http://archive.apache.org/dist/hive/hive-3.1.2/
- Add to .bashrc:
    ```bash
    export HIVE_HOME=/home/tau/apache-hive-3.1.2-bin
    export PATH=$PATH:$HIVE_HOME/bin
    ```
- This may require copying a compatible jar file: https://issues.apache.org/jira/browse/HIVE-22915

- This requires you have mysql installed:
```bash
sudo apt install mysql-server
sudo mysql_secure_installation
```
- And a `hive-site.xml` file in the conf directory (https://stackoverflow.com/questions/35449274/java-lang-runtimeexception-unable-to-instantiate-org-apache-hadoop-hive-ql-meta).

In [None]:
# Save in a Hive table in ORC dataformat
from pyspark.sql import HiveContext
hc = HiveContext(sc)
df_csv.write.format("orc").saveAsTable("employees")