# PYSPARK

In [1]:
from pyspark.sql import SparkSession

In [2]:
session = SparkSession.builder.getOrCreate()

21/09/18 01:51:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
rdd = session.sparkContext.parallelize([1,2,3])

In [7]:
# bring first 2 values of rdd to the driver
rdd.take(2)

[1, 2]

In [6]:
# return the length of the rdd
rdd.count()

3

In [8]:
# send all rdd data to the driver
# be careful tho, make sure the big data is not too big
rdd.collect()

[1, 2, 3]

In [9]:
df = session.createDataFrame(
    [[1,2,3], [4,5,6]], ["col1", "col2", "col3"]
)

In [10]:
df

DataFrame[col1: bigint, col2: bigint, col3: bigint]

In [13]:
df.show(1)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
+----+----+----+
only showing top 1 row



In [14]:
df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+



RDD's and dataframes are immutable.  
because concurrent processing often requires immutable data

Transformations are lazy loaded, and they don't actually run until you consume their results via an action

### dataframe manipulation

In [16]:
import pyspark.sql.functions as funcs
import pyspark.sql.types as types

In [17]:
def multiply_by_ten(num):
    return num*10

In [19]:
# udf stands for user defined function
multiply_udf = funcs.udf(multiply_by_ten, types.DoubleType())

transformed_df = df.withColumn('multiplied', multiply_udf('col1'))
transformed_df.show()

+----+----+----+----------+
|col1|col2|col3|multiplied|
+----+----+----+----------+
|   1|   2|   3|      null|
|   4|   5|   6|      null|
+----+----+----+----------+



In [22]:
import math

In [23]:
def take_log_in_all_columns(row: types.Row):
     old_row = row.asDict()
     new_row = {f'log({column_name})': math.log(value) 
                for column_name, value in old_row.items()}
     return types.Row(**new_row)

In [24]:
logarithmic_dataframe = df.rdd.map(take_log_in_all_columns).toDF()

### sql functionality

In [28]:
df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+



In [29]:
df.select("col1", "col2").show()

+----+----+
|col1|col2|
+----+----+
|   1|   2|
|   4|   5|
+----+----+



In [31]:
df.where("col1 = 4").show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   4|   5|   6|
+----+----+----+



In [34]:
# create temporal view
df.createOrReplaceTempView("table1")

In [35]:
# perform query on top of that view
df2 = session.sql("SELECT col1 AS f1, col2 as f2 from table1")

In [37]:
df2.show()

+---+---+
| f1| f2|
+---+---+
|  1|  2|
|  4|  5|
+---+---+



### column operations

In [39]:
df3 = df.withColumn("derived_column", df["col1"] + (df["col2"] * df["col3"]))
df3.show()

+----+----+----+--------------+
|col1|col2|col3|derived_column|
+----+----+----+--------------+
|   1|   2|   3|             7|
|   4|   5|   6|            34|
+----+----+----+--------------+



### aggregation and quick stats

In [40]:
ADULT_COLUMN_NAMES = [
     "age",
     "workclass",
     "fnlwgt",
     "education",
     "education_num",
     "marital_status",
     "occupation",
     "relationship",
     "race",
     "sex",
     "capital_gain",
     "capital_loss",
     "hours_per_week",
     "native_country",
     "income"
 ]

csv_df = session.read.csv("adult.data.csv", header=False, inferSchema=True)

In [45]:
for new_col, old_col in zip(ADULT_COLUMN_NAMES, csv_df.columns):
     csv_df = csv_df.withColumnRenamed(old_col, new_col)

In [48]:
csv_df.describe().show()

[Stage 37:>                                                         (0 + 1) / 1]

+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|summary|               age|   workclass|            fnlwgt|    education|    education_num|marital_status|       occupation|relationship|               race|    sex|      capital_gain|    capital_loss|    hours_per_week|native_country|income|
+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|            32561|         32561|            32561|       32561|              32561|  32561|             32561|           32561|             32561|         32561| 32561|
|   mean| 38.58164675532

                                                                                

In [49]:
work_hours_df = csv_df.groupBy("age").agg(funcs.avg("hours_per_weeK"), funcs.stddev_samp('hours_per_week')).sort("age")
work_hours_df.show()

+---+-------------------+---------------------------+
|age|avg(hours_per_weeK)|stddev_samp(hours_per_week)|
+---+-------------------+---------------------------+
| 17| 21.367088607594937|         10.021014993616216|
| 18| 25.912727272727274|         11.733362123434848|
| 19| 30.678370786516854|         12.119154493614719|
| 20|  32.28021248339974|         11.726599330994663|
| 21|  34.03472222222222|         12.040389374051912|
| 22|  35.17124183006536|         11.968466821743275|
| 23|  36.71835803876853|         10.916632739093428|
| 24|  39.08897243107769|         10.638975889466733|
| 25|  40.00713436385256|         10.452953398659348|
| 26|  41.06496815286624|          11.29552504314252|
| 27| 42.039520958083834|         10.755941741375546|
| 28|  42.02768166089965|         10.737113530868324|
| 29|  42.36531365313653|         10.206157095904361|
| 30| 42.167247386759584|         10.990266114829758|
| 31| 42.877252252252255|         11.008740019442087|
| 32| 42.878019323671495|   

### Database connectivity

```python
# create a session aware of the JDBC driver
session = SparkSession.builder.config(
    'spark.jars', 'bin/postgresql-42.2.16.jar'
).config(
    'spark.driver.extraClassPath', 'bin/postgresql-42.2.16.jar'
).getOrCreate()
```

```python
# read from the database
url = f"jdbc:postgresql://your_host_ip:5432/your_database"
properties = {'user': 'your_user', 'password': 'your_password'}
# read from a table into a dataframe
df = session.read.jdbc(
    url=url, table='your_table_name', properties=properties
)
```

```python
### writing a pyspark dataframe to the database
transformed_df.write.jdbc(
    url=url, table='new_table', mode='append', properties=properties
)
```