# PySpark Learning Notes

In [1]:
# Simple SQL query
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.sql.repl.eagerEval.enabled', True).getOrCreate()
df = spark.sql('''select 'spark' as hello ''')
df.show()


+-----+
|hello|
+-----+
|spark|
+-----+



In [3]:
from datetime import datetime, date
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])


In [4]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [5]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [6]:
# enabling more elegant look
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
4,5.0,string3,2000-03-01,2000-01-03 12:00:00


In [7]:
# adding a new row to the existing dataframe
df2 = spark.createDataFrame([Row(a=4, b=8.4, c='string4', d=date(2021, 8, 1), e=datetime(2021, 8, 1, 13, 46))])

In [37]:
df = df.union(df2)
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
|  4|8.4|string4|2021-08-01|2021-08-01 13:46:00|
|  4|8.4|string4|2021-08-01|2021-08-01 13:46:00|
+---+---+-------+----------+-------------------+



In [38]:
# spark user defined functions
import pandas as pd
from pyspark.sql.functions import pandas_udf


@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1


df.select(pandas_plus_one(df.a)).show()


+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 5|
|                 5|
|                 5|
+------------------+



In [39]:
# checking the grouping and aggregation functions
df.groupBy("a").sum().show()

+---+------+------+
|  a|sum(a)|sum(b)|
+---+------+------+
|  1|     1|   2.0|
|  2|     2|   3.0|
|  4|    12|  21.8|
+---+------+------+



In [None]:
# Storing the dataframe to a csv
df.write.csv('file:///bar.csv"',mode="append", quoteAll=True)

In [None]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
|  4|8.4|string4|2021-08-01|2021-08-01 13:46:00|
+---+---+-------+----------+-------------------+



# SPARK SQL - Creating a view

In [None]:
# creating a view or consider it as a temp table where you can run SQL queries on them
df.createOrReplaceTempView("demoTable")
spark.sql("SELECT count(*) from demoTable").show()

+--------+
|count(1)|
+--------+
|       4|
+--------+



In [52]:
spark.sql("select * from demoTable where a = 4").show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
|  4|8.4|string4|2021-08-01|2021-08-01 13:46:00|
+---+---+-------+----------+-------------------+

