In [None]:
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, LongType
from pyspark.sql import SparkSession  #allows us to work in structure data/schema

In [None]:
spark = SparkSession.builder.appName("New York Stock Data").getOrCreate()

In [None]:
#true is for null values here which allows null values

In [None]:
schema_nyse = StructType().add("exchange_name",StringType(),True).add("stock_id",StringType(),True).add("stock_dt",StringType(),True).add("open",DoubleType(),True).add("high",DoubleType(),True).add("low",DoubleType(),True).add("close",DoubleType(),True).add("volume",LongType(),True).add("adj_close",DoubleType(),True)

In [None]:
print(schema_nyse)

StructType([StructField('exchange_name', StringType(), True), StructField('stock_id', StringType(), True), StructField('stock_dt', StringType(), True), StructField('open', DoubleType(), True), StructField('high', DoubleType(), True), StructField('low', DoubleType(), True), StructField('close', DoubleType(), True), StructField('volume', LongType(), True), StructField('adj_close', DoubleType(), True)])


In [None]:
#for other than comma sperated values we can use .option("delimiter","")
#for skipping rows we can use .option("skipRows","1") cant skip the rows in between

In [None]:
df_w_schema = spark.read.format("csv").option("header","False").schema(schema_nyse).load("/content/sample_data/NYSE.csv")

In [None]:
df_w_schema.printSchema()

root
 |-- exchange_name: string (nullable = true)
 |-- stock_id: string (nullable = true)
 |-- stock_dt: string (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- adj_close: double (nullable = true)



In [None]:
df_w_schema.count()

735026

In [None]:
df_w_schema.show(10)  #by default show() will give 20 only

+-------------+--------+----------+----+----+----+-----+------+---------+
|exchange_name|stock_id|  stock_dt|open|high| low|close|volume|adj_close|
+-------------+--------+----------+----+----+----+-----+------+---------+
|         NYSE|     AEA|2010-02-08|4.42|4.42|4.21| 4.24|205500|     4.24|
|         NYSE|     AEA|2010-02-05|4.42|4.54|4.22| 4.41|194300|     4.41|
|         NYSE|     AEA|2010-02-04|4.55|4.69|4.39| 4.42|233800|     4.42|
|         NYSE|     AEA|2010-02-03|4.65|4.69| 4.5| 4.55|182100|     4.55|
|         NYSE|     AEA|2010-02-02|4.74| 5.0|4.62| 4.66|222700|     4.66|
|         NYSE|     AEA|2010-02-01|4.84|4.92|4.68| 4.75|194800|     4.75|
|         NYSE|     AEA|2010-01-29|4.97|5.05|4.76| 4.83|222900|     4.83|
|         NYSE|     AEA|2010-01-28|5.12|5.22|4.81| 4.98|283100|     4.98|
|         NYSE|     AEA|2010-01-27|4.82|5.16|4.79| 5.09|243500|     5.09|
|         NYSE|     AEA|2010-01-26|5.18|5.18|4.81| 4.84|554800|     4.84|
+-------------+--------+----------+---

In [None]:
df_w_schema.registerTempTable("nyse")  #dataframe name and table name can be same




In [None]:
df_TotStockVol = spark.sql("select stock_id,sum(volume) from nyse group by stock_id")

In [None]:
df_TotStockVol.count()

203

In [None]:
df_TotStockVol.show(203)

+--------+-----------+
|stock_id|sum(volume)|
+--------+-----------+
|     AXP|40263020300|
|     AAV|  834246600|
|     ARM| 2087366800|
|     ASH| 2717488700|
|     AEB|   53273300|
|     ALE|  465167800|
|     ACH| 1448279800|
|     ASF|  848352700|
|     ABK|11899868300|
|     ATU| 1226088700|
|      AM| 2963437400|
|      AA|42061448400|
|     ALL|11492379500|
|     ADI|14597316000|
|     AKP|   34156700|
|     ARK|  408851300|
|     ANN| 4892554900|
|     ABM|  675519400|
|     AOS|  601422200|
|     ABX|16691172100|
|     ADX|  323399200|
|     ATE|   38969400|
|     ACO|  519620500|
|     AGD|  100765300|
|     ACC|  495415800|
|     ARE|  759981300|
|     AIG| 7062693700|
|     APH| 3807963100|
|     AAP| 2802701500|
|     ADM|15354593500|
|     AFG| 1815621200|
|      AU| 3143678800|
|     AOL|  147580700|
|     AVB| 1870368200|
|     AZN| 3418077300|
|     AIZ| 1676102800|
|     AHC|   78301600|
|     APC|15555731900|
|     ANW|  327122100|
|     APL|  364876100|
|     AVT| 

In [None]:
df_TotStockVol.rdd.getNumPartitions()

1

In [None]:
df_TotStockVol.write.csv("/content/sample_data/pyspark1")