# PySpark Fundamentals

### List of Topics
1. Initializing SparkSession
2. 

#### Initializing SparkSession

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

23/02/07 22:55:28 WARN Utils: Your hostname, Nihars-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.4.162 instead (on interface en0)
23/02/07 22:55:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/07 22:55:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Create DataFrame

In [2]:
from datetime import datetime, date
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

In [None]:
df.printSchema()


In [None]:
schema='a long, b double, c string, d date, e timestamp'
data = [
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
]

df = spark.createDataFrame(data=data, schema=schema)

#### View Dataframe Contents

In [None]:
df.show()

In [None]:
df.show(vertical=True)

In [None]:
df.describe().show()

#### collect(), take(), tail()

` DataFrame.collect(): Collects ALL the distributed data to the driver side as the local data in Python
> df.collect()


` DataFrame.take(n): Collects First n number of distributed data to the driver side as the local data in Python
> df.take(2)

` DataFrame.tail(n): Collects First n number of distributed data to the driver side as the local data in Python
> df.tail(2)

In [None]:
df.collect()

In [None]:
df.take(2)


In [None]:
df.tail(2)

#### CSV

In [None]:
df.write.csv('temp/foo.csv', header=True)

In [None]:
df = spark.read.csv('temp/foo.csv', header=True)
df.show()

#### Parquet

In [15]:
df.write.parquet('bar.parquet')

                                                                                

In [16]:
df = spark.read.parquet('bar.parquet')
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+



#### ORC Files

In [4]:
df.write.orc('zoo.orc')

                                                                                

In [5]:
df = spark.read.orc('zoo.orc')
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



#### Dataframe and Spark SQL

In [12]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



#### UDF: SQL user defined functions

In [14]:
import pandas as pd
from pyspark.sql import Column
from pyspark.sql.functions import upper
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(a) FROM tableA").show()

23/02/07 23:02:14 WARN SimpleFunctionRegistry: The function add_one replaced a previously registered function.


                                                                                

+----------+
|add_one(a)|
+----------+
|         3|
|         5|
|         2|
+----------+

