<a href="https://colab.research.google.com/github/gtoubian/cce/blob/main/3_5_Intro_to_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#What is PySpark?

In today's lecture, we will be covering the use of the package, PySpark. Spark is a software for computing in clusters. Spark allows you to spread data/computations over clusters with multiple nodes. For explanations sake, you can think of a node as a seperate computer and all these multiple computers work together to perform a task. This is ideal for Big Data Analysis as splitting data makes it easier to work with since we have nodes working only with a small amount of data.

Each node works on its own subset of the total data and also carries out a part of the total calculations required, which allows for the data processing and calculation to be parallelized over the cluster. Parallel computation is a handy tool in that it can make certain types of programming tasks much faster. Becareful though, because Spark calculations are so complex


In [None]:
!pip install pyspark
from pyspark.sql import SparkSession

Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. However, RDDs are hard to work with directly, so in this lecture, you'll be using the Spark DataFrame abstraction built on top of RDDs.

The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDDs.

To start working with Spark DataFrames, we first have to create a SparkSession object from your SparkContext. 

In [None]:
# Create my_spark
my_spark = SparkSession.builder.getOrCreate()

# Print my_spark
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x7f28abfce490>


In [None]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

In [None]:
print(spark.catalog.listTables())

[]


Let's load some data for us to use. We'll use us stock price data to explore PySpark. 
https://www.kaggle.com/dinnymathew/usstockprices

In [None]:
from pyspark.sql.types import functions as F

data_schema = [
               StructField('_c0', IntegerType(), True),
               StructField('symbol', StringType(), True),
               StructField('data', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields = data_schema)

data = spark.read.csv('stocks_price_final.csv',sep = ',', header = True, schema = final_struc)

In [None]:
data.show(5)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

Like you've seen with SQL, we can use aggregate functions such as sum, count, avg, etc. on out data

In [None]:
## Find Average price of opening and closing stock

for i in ['open', 'close']:
  print(f'''Average {i} Stock Price: {
    data.select(F.avg(i)).collect()[0][0]
    }''')

Average open Stock Price: 15070.071703341051
Average close Stock Price: 15032.714854330707


The collect_set function allows you to pick the distinct elements in a given column.

In [None]:
data.select(F.collect_set("exchange")).show(truncate=False)

+---------------------+
|collect_set(exchange)|
+---------------------+
|[NYSE, NASDAQ]       |
+---------------------+



In [None]:
data.select(F.count("sector")).show(truncate=False)

+-------------+
|count(sector)|
+-------------+
|1729034      |
+-------------+



In [None]:
data.select(F.countDistinct("sector")).show(truncate=False)

+----------------------+
|count(DISTINCT sector)|
+----------------------+
|12                    |
+----------------------+



In [None]:
data.select(F.kurtosis("volume")).show(truncate=False)
data.select(F.skewness("volume")).show(truncate=False)
data.select(F.stddev("volume")).show(truncate=False)
data.select(F.variance("volume")).show(truncate=False)


+------------------+
|kurtosis(volume)  |
+------------------+
|1159.3465390444946|
+------------------+

+------------------+
|skewness(volume)  |
+------------------+
|22.534251558551144|
+------------------+



In [None]:
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("sector").orderBy("industry")

data.withColumn("row_number",F.row_number().over(windowSpec)).show(10, truncate=False)

+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----------+
|_c0 |symbol|data      |open |high |low  |close|volume|adjusted|market.cap|sector       |industry         |exchange|row_number|
+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----------+
|4253|KRKR  |2019-11-08|13.0 |14.5 |12.58|13.06|479100|13.06   |$130.48M  |Miscellaneous|Business Services|NASDAQ  |1         |
|4254|KRKR  |2019-11-11|12.72|12.78|10.7 |10.74|226200|10.74   |$130.48M  |Miscellaneous|Business Services|NASDAQ  |2         |
|4255|KRKR  |2019-11-12|10.33|10.96|8.91 |8.92 |186800|8.92    |$130.48M  |Miscellaneous|Business Services|NASDAQ  |3         |
|4256|KRKR  |2019-11-13|8.88 |9.73 |8.72 |9.21 |117000|9.21    |$130.48M  |Miscellaneous|Business Services|NASDAQ  |4         |
|4257|KRKR  |2019-11-14|9.04 |9.49 |9.04 |9.2  |59700 |9.2     |$130.48M  |Miscellaneous|Business Servic

In [None]:
data.withColumn("rank", F.rank().over(windowSpec)).show(5)
data.withColumn("dense_rank", F.dense_rank().over(windowSpec)).show(5)

+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----+
| _c0|symbol|      data| open| high|  low|close|volume|adjusted|market.cap|       sector|         industry|exchange|rank|
+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----+
|4253|  KRKR|2019-11-08| 13.0| 14.5|12.58|13.06|479100|   13.06|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
|4254|  KRKR|2019-11-11|12.72|12.78| 10.7|10.74|226200|   10.74|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
|4255|  KRKR|2019-11-12|10.33|10.96| 8.91| 8.92|186800|    8.92|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
|4256|  KRKR|2019-11-13| 8.88| 9.73| 8.72| 9.21|117000|    9.21|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
|4257|  KRKR|2019-11-14| 9.04| 9.49| 9.04|  9.2| 59700|     9.2|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
+----+------+----------+