In [9]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType, IntegerType
from pyspark.sql.functions import split

schema = StructType([
    StructField("stock", StringType()),
    StructField("date", DateType()),
    StructField("price", DoubleType()),
    StructField("volume", DoubleType())
])

df = spark.read.csv("data/raw/MS1.txt", schema=schema, dateFormat="MM/dd/yyyy")

split_col = split(df["stock"], "\.", 2)

df = df.withColumn("stockNo", split_col.getItem(0).cast(IntegerType())).withColumn("stockName", split_col.getItem(1)).select("stockNo", "stockName", "date", "price", "volume")

df.show(truncate=False)

+-------+--------------------------------------------------+----------+-----+-------+
|stockNo|stockName                                         |date      |price|volume |
+-------+--------------------------------------------------+----------+-----+-------+
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-01|18.98|50635.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-04|18.52|51616.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-05|19.15|54898.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-06|19.71|41555.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-07|19.17|44430.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-08|18.94|72673.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-11|19.1 |45426.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-12|19.39|61457.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc.

In [7]:
df.count()

38374198

In [10]:
#count null for each col
df_agg = df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])
df_agg.show()

+-------+---------+----+-----+------+
|stockNo|stockName|date|price|volume|
+-------+---------+----+-----+------+
|      0|        0|   0|    0|    25|
+-------+---------+----+-----+------+



In [11]:
df_pre = df.dropna()
df_pre.show(truncate=False)

+-------+--------------------------------------------------+----------+-----+-------+
|stockNo|stockName                                         |date      |price|volume |
+-------+--------------------------------------------------+----------+-----+-------+
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-01|18.98|50635.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-04|18.52|51616.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-05|19.15|54898.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-06|19.71|41555.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-07|19.17|44430.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-08|18.94|72673.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-11|19.1 |45426.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc._CRAI|2016-01-12|19.39|61457.0|
|32843  |Nordamerika_USA-NASDAQ_CRA-International-Inc.

In [12]:
df_pre.count()

38374173

In [16]:
cols = df_pre.columns
print(cols)

['stockNo', 'stockName', 'date', 'price', 'volume']
