# POC - PySpark
'''PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, 
but also provides the PySpark shell for interactively analyzing your data in a distributed environment. PySpark supports 
most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) and Spark Core.'''
## It`s great to use when we are dealing with huge amounts of data

## This is a notebook containing the most useful functions for data manipulation, it`s based on https://towardsdatascience.com/beginners-guide-to-pyspark-bbe3b553b79f

## In that example, we are manipulating a stock price dataset from kaggle using several pyspark functions

## To download the dataset, please access https://www.kaggle.com/dinnymathew/usstockprices

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f

In [45]:
# Initializating session
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

In [46]:
# Before structuring schema

data = spark.read.csv(
    'data/stocks_price_final.csv',
    sep = ',',
    header = True,
    )

data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjusted: string (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [70]:
# Defining schema

data_schema = [
               StructField('_c0', IntegerType(), True),
               StructField('symbol', StringType(), True),
               StructField('date', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields = data_schema)

data = spark.read.csv(
    'data/stocks_price_final.csv',
    sep = ',',
    header = True,
    schema = final_struc 
    )

data = data.drop('_c0')

data.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- date: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [71]:
# Show schema
data.schema

StructType(List(StructField(symbol,StringType,true),StructField(date,DateType,true),StructField(open,DoubleType,true),StructField(high,DoubleType,true),StructField(low,DoubleType,true),StructField(close,DoubleType,true),StructField(volume,IntegerType,true),StructField(adjusted,DoubleType,true),StructField(market.cap,StringType,true),StructField(sector,StringType,true),StructField(industry,StringType,true),StructField(exchange,StringType,true)))

In [72]:
# Show schema

data.dtypes

[('symbol', 'string'),
 ('date', 'date'),
 ('open', 'double'),
 ('high', 'double'),
 ('low', 'double'),
 ('close', 'double'),
 ('volume', 'int'),
 ('adjusted', 'double'),
 ('market.cap', 'string'),
 ('sector', 'string'),
 ('industry', 'string'),
 ('exchange', 'string')]

In [73]:
# Showing the first 4 rows

data.head(4)

[Row(symbol='TXG', date=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(symbol='TXG', date=datetime.date(2019, 9, 13), open=52.75, high=54.355, low=49.150002, close=52.27, volume=1025200, adjusted=52.27, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(symbol='TXG', date=datetime.date(2019, 9, 16), open=52.450001, high=56.0, low=52.009998, close=55.200001, volume=269900, adjusted=55.200001, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(symbol='TXG', date=datetime.date(2019, 9, 17), open=56.209999, high=60.900002, low=55.423, close=56.779999, volume=602800, adjusted=56.779999, market.cap='$9.31B', sector='Capital Goods', industry='Biotech

In [74]:
# Showing the first rows

data.show()

+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-18|56.849998|    62.27|55.650002|     62.0|1589600|  

In [75]:
# Showing the first 4 rows

data.take(4)

[Row(symbol='TXG', date=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(symbol='TXG', date=datetime.date(2019, 9, 13), open=52.75, high=54.355, low=49.150002, close=52.27, volume=1025200, adjusted=52.27, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(symbol='TXG', date=datetime.date(2019, 9, 16), open=52.450001, high=56.0, low=52.009998, close=55.200001, volume=269900, adjusted=55.200001, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(symbol='TXG', date=datetime.date(2019, 9, 17), open=56.209999, high=60.900002, low=55.423, close=56.779999, volume=602800, adjusted=56.779999, market.cap='$9.31B', sector='Capital Goods', industry='Biotech

In [76]:
# Showing columns

data.columns

['symbol',
 'date',
 'open',
 'high',
 'low',
 'close',
 'volume',
 'adjusted',
 'market.cap',
 'sector',
 'industry',
 'exchange']

In [77]:
# Counting data

data.count()

                                                                                

1729034

In [78]:
# Showing schema

data.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- date: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [58]:
# Adding column

data = data.withColumn('date', data.date)

data.show(5)

+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-18|56.849998|    62.27|55.650002|     62.0|1589600|  

In [59]:
# Rename columnn

#data = data.withColumnRenamed('date', 'data_changed')

data.show(5)

+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-18|56.849998|    62.27|55.650002|     62.0|1589600|  

In [60]:
# Deleting column

#data = data.drop('data_changed')

data.show(5)

+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2019-09-18|56.849998|    62.27|55.650002|     62.0|1589600|  

In [61]:
# Dealing with missing data

# Remove Rows with Missing Values

#data.na.drop()

# Replacing Missing Values with Mean

data.na.fill(data.select(f.mean(data['open'])).collect()[0][0])

# Replacing Missing Values with new values

#data.na.replace(old_value, new_vallue)

                                                                                

DataFrame[symbol: string, date: date, open: double, high: double, low: double, close: double, volume: int, adjusted: double, market.cap: string, sector: string, industry: string, exchange: string]

# Querying data

In [62]:
# Select

## Selecting Single Column

data.select('sector').show(5)

## Selecting Multiple columns

data.select(['open', 'close', 'adjusted']).show(5)

+-------------+
|       sector|
+-------------+
|Capital Goods|
|Capital Goods|
|Capital Goods|
|Capital Goods|
|Capital Goods|
+-------------+
only showing top 5 rows

+---------+---------+---------+
|     open|    close| adjusted|
+---------+---------+---------+
|     54.0|    52.75|    52.75|
|    52.75|    52.27|    52.27|
|52.450001|55.200001|55.200001|
|56.209999|56.779999|56.779999|
|56.849998|     62.0|     62.0|
+---------+---------+---------+
only showing top 5 rows



In [63]:
# Filtering

from pyspark.sql.functions import col, lit

data.filter( (col('date') >= lit('2020-01-01')) & (col('date') <= lit('2020-01-31')) ).show(5)


+------+----------+---------+---------+---------+---------+------+---------+----------+-------------+--------------------+--------+
|symbol|      date|     open|     high|      low|    close|volume| adjusted|market.cap|       sector|            industry|exchange|
+------+----------+---------+---------+---------+---------+------+---------+----------+-------------+--------------------+--------+
|   TXG|2020-01-02|76.910004|77.989998|71.480003|72.830002|220200|72.830002|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2020-01-03|71.519997|76.188004|70.580002|75.559998|288300|75.559998|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2020-01-06|75.269997|77.349998|73.559998|75.550003|220600|75.550003|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2020-01-07|     76.0|77.279999|    75.32|75.980003|182400|75.980003|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2020-01-08|76.089996|76.949997|72.739998|74.839996|172100|74.839996|

In [64]:
# Between

## fetch the data where the adjusted value is between 100.0 and 500.0

data.filter(data.adjusted.between(100.0, 500.0)).show()

+------+----------+----------+----------+----------+----------+------+----------+----------+-------------+--------------------+--------+
|symbol|      date|      open|      high|       low|     close|volume|  adjusted|market.cap|       sector|            industry|exchange|
+------+----------+----------+----------+----------+----------+------+----------+----------+-------------+--------------------+--------+
|   TXG|2020-01-24| 95.459999|     101.0| 94.157997|100.790001|328100|100.790001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2020-01-27| 99.760002|104.892998| 97.019997|103.209999|334900|103.209999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|   TXG|2020-01-28|104.620003|108.269997|103.297997|106.620003|245400|106.620003|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  ABMD|2019-01-02|315.940002|320.709991|307.029999|309.959991|590000|309.959991|   $13.39B|  Health Care|Medical/Dental In...|  NASDAQ|
|  ABMD|2019-01-03|    307.25| 311.73999|

In [65]:
# When


data.select(col("*"), 
            f.when(data.adjusted >= 200.0, 1).otherwise(0)
           ).show(5)

+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+-----------------------------------------------+
|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|CASE WHEN (adjusted >= 200.0) THEN 1 ELSE 0 END|
+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+-----------------------------------------------+
|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|                                              0|
|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|                                              0|
|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital

In [66]:
# Like 

data.select('sector', 
            data.sector.rlike('^[B,C]').alias('Sector Starting with B or C')
            ).distinct().show()

[Stage 50:>                                                         (0 + 8) / 8]

+--------------------+---------------------------+
|              sector|Sector Starting with B or C|
+--------------------+---------------------------+
|         Health Care|                      false|
|       Capital Goods|                       true|
|Consumer Non-Dura...|                       true|
|    Public Utilities|                      false|
|   Consumer Durables|                       true|
|             Finance|                      false|
|      Transportation|                      false|
|       Miscellaneous|                      false|
|   Consumer Services|                       true|
|              Energy|                      false|
|    Basic Industries|                       true|
|          Technology|                      false|
+--------------------+---------------------------+



                                                                                

In [67]:
# Group by

data.select('sector', 
            data.sector.rlike('^[B,C]').alias('Sector Starting with B or C')
            ).distinct().show()

[Stage 53:>                                                         (0 + 8) / 8]

+--------------------+---------------------------+
|              sector|Sector Starting with B or C|
+--------------------+---------------------------+
|         Health Care|                      false|
|       Capital Goods|                       true|
|Consumer Non-Dura...|                       true|
|    Public Utilities|                      false|
|   Consumer Durables|                       true|
|             Finance|                      false|
|      Transportation|                      false|
|       Miscellaneous|                      false|
|   Consumer Services|                       true|
|              Energy|                      false|
|    Basic Industries|                       true|
|          Technology|                      false|
+--------------------+---------------------------+



                                                                                

In [68]:
# Aggregation

data.filter( (col('date') >= lit('2019-01-02')) & (col('date') <= lit('2020-01-31')) )\
    .groupBy("sector") \
    .agg(f.min("date").alias("From"), 
         f.max("date").alias("To"), 
         
         f.min("open").alias("Minimum Opening"),
         f.max("open").alias("Maximum Opening"), 
         f.avg("open").alias("Average Opening"), 

         f.min("close").alias("Minimum Closing"), 
         f.max("close").alias("Maximum Closing"), 
         f.avg("close").alias("Average Closing"), 

         f.min("adjusted").alias("Minimum Adjusted Closing"), 
         f.max("adjusted").alias("Maximum Adjusted Closing"), 
         f.avg("adjusted").alias("Average Adjusted Closing"), 

      ).show(truncate=False)



+---------------------+----------+----------+---------------+---------------+------------------+---------------+---------------+------------------+------------------------+------------------------+------------------------+
|sector               |From      |To        |Minimum Opening|Maximum Opening|Average Opening   |Minimum Closing|Maximum Closing|Average Closing   |Minimum Adjusted Closing|Maximum Adjusted Closing|Average Adjusted Closing|
+---------------------+----------+----------+---------------+---------------+------------------+---------------+---------------+------------------+------------------------+------------------------+------------------------+
|Miscellaneous        |2019-01-02|2020-01-31|0.16           |690.0          |51.51619596530091 |0.16           |691.099976     |51.54953412269035 |0.16                    |691.099976              |51.210507338518944      |
|Health Care          |2019-01-02|2020-01-31|0.072          |186000.0       |146.7756197567625 |0.071       

                                                                                

In [69]:
data.columns

['symbol',
 'date',
 'open',
 'high',
 'low',
 'close',
 'volume',
 'adjusted',
 'market.cap',
 'sector',
 'industry',
 'exchange']