In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, TimestampType

from pyspark.sql.functions import col,isnan, when, count

In [2]:
spark = SparkSession.builder.appName("stock").getOrCreate()
sc = spark.sparkContext
data_file = "gs://stock-project-sp500/Data/S&P_500_Full_Stock_Data.csv"

In [3]:
stock_schema = StructType([StructField('Symbol', StringType(), False),
                           StructField('Date', TimestampType(), False),
                           StructField('Open', FloatType(), True),
                           StructField('High', FloatType(), True),
                           StructField('Low', FloatType(), True),
                           StructField('Close', FloatType(), True),
                           StructField('Adj Close', FloatType(), True),
                           StructField('Volume', IntegerType(), True),
                           StructField('Description', StringType(), False),
                           StructField('Category2', StringType(), False),
                           StructField('Category3', StringType(), False),
                           StructField('GICS Sector', StringType(), False)])

In [21]:
non_null_columns = ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

In [4]:
stock_df = spark.read.csv(data_file,
                          header = True,
                          schema = stock_schema).cache()

In [22]:
from pyspark.sql.functions import isnan, when, count, col
stock_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in non_null_columns]).show()

+----+----+---+-----+---------+------+
|Open|High|Low|Close|Adj Close|Volume|
+----+----+---+-----+---------+------+
|   7|   7|  7|    7|        7|  9569|
+----+----+---+-----+---------+------+



In [28]:
stock_df.columns

['Symbol',
 'Date',
 'Open',
 'High',
 'Low',
 'Close',
 'Adj Close',
 'Volume',
 'Description',
 'Category2',
 'Category3',
 'GICS Sector']

In [32]:
from pyspark.sql import functions as F
columns = ['Symbol',
 'Open',
 'High',
 'Low',
 'Close',
 'Adj Close',
 'Volume']
# Ignoring groupBy column and considering cols which are required in aggregation
columns.remove("Symbol")
agg_expression = [F.sum(when(stock_df[x].isNull(), 1).otherwise(0)).alias(x) for x in columns]

null_values_by_stock = stock_df.groupby("Symbol").agg(*agg_expression)

In [38]:
df2 = null_values_by_stock.withColumn(
    'SUM1',
    sum([F.col(c) for c in columns])
)
df2.orderBy(col('SUM1').desc()).show(20)

+------+----+----+---+-----+---------+------+----+
|Symbol|Open|High|Low|Close|Adj Close|Volume|SUM1|
+------+----+----+---+-----+---------+------+----+
|  BF-B|   1|   1|  1|    1|        1|  1367|1372|
|    EA|   1|   1|  1|    1|        1|  1367|1372|
|  NLOK|   1|   1|  1|    1|        1|  1367|1372|
|   AEE|   1|   1|  1|    1|        1|  1367|1372|
|   HPQ|   1|   1|  1|    1|        1|  1367|1372|
|   PVH|   1|   1|  1|    1|        1|  1367|1372|
|    CI|   1|   1|  1|    1|        1|  1367|1372|
|   AAP|   0|   0|  0|    0|        0|     0|   0|
|   AIZ|   0|   0|  0|    0|        0|     0|   0|
|   AME|   0|   0|  0|    0|        0|     0|   0|
|   AES|   0|   0|  0|    0|        0|     0|   0|
|   BDX|   0|   0|  0|    0|        0|     0|   0|
|  ABBV|   0|   0|  0|    0|        0|     0|   0|
|   CSX|   0|   0|  0|    0|        0|     0|   0|
|  ALGN|   0|   0|  0|    0|        0|     0|   0|
|    CL|   0|   0|  0|    0|        0|     0|   0|
|   ADM|   0|   0|  0|    0|   

In [36]:
df2.show(5)

+------+----+----+---+-----+---------+------+----+
|Symbol|Open|High|Low|Close|Adj Close|Volume|SUM1|
+------+----+----+---+-----+---------+------+----+
|  ABMD|   0|   0|  0|    0|        0|     0|   0|
|   AVY|   0|   0|  0|    0|        0|     0|   0|
|   CRL|   0|   0|  0|    0|        0|     0|   0|
|   AXP|   0|   0|  0|    0|        0|     0|   0|
|   CDW|   0|   0|  0|    0|        0|     0|   0|
+------+----+----+---+-----+---------+------+----+
only showing top 5 rows

