# Spark Tutorial Notebook

### Setup

In [58]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, 
                               StructType,
                               StringType, 
                               IntegerType,
                               FloatType,
                               DateType)

In [59]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

### Build Schema and Read Data

In [60]:
data_schema = [
    StructField("ticker", StringType(), True),
    StructField("open", FloatType()),
    StructField("close", FloatType()),
    StructField("adj_close", FloatType()),
    StructField("low", FloatType()),
    StructField("high", FloatType()),
    StructField("volume", FloatType()),
    StructField("date", DateType())
]

final_struct = StructType(fields=data_schema)

data_frame = spark.read.csv("historical_stock_prices.csv", schema=final_struct)
data_frame.printSchema()

root
 |-- ticker: string (nullable = true)
 |-- open: float (nullable = true)
 |-- close: float (nullable = true)
 |-- adj_close: float (nullable = true)
 |-- low: float (nullable = true)
 |-- high: float (nullable = true)
 |-- volume: float (nullable = true)
 |-- date: date (nullable = true)



In [73]:
data_frame = data_frame.withColumnRenamed("ticker", "symbol")
data_frame = data_frame.withColumnRenamed("open", "opening_price")
data_frame = data_frame.withColumnRenamed("close", "closing_price")
data_frame = data_frame.withColumnRenamed("low", "lowest_price")
data_frame = data_frame.withColumnRenamed("high", "highest_price")

data_frame = data_frame.withColumn("volume_in_millions", data_frame["volume"] / 1000000)

data_frame.head(2)

[Row(symbol=None, opening_price=None, closing_price=None, adj_close=None, lowest_price=None, highest_price=None, volume=None, date=None, volume_in_millions=None),
 Row(symbol='AHH', opening_price=11.5, closing_price=11.579999923706055, adj_close=8.493154525756836, lowest_price=11.25, highest_price=11.680000305175781, volume=4633900.0, date=datetime.date(2013, 5, 8), volume_in_millions=4.6339)]

### Spark SQL Example

In [86]:
data_frame.createOrReplaceTempView("stocks")

In [88]:
results = spark.sql("SELECT * FROM stocks WHERE symbol='MSFT'")
results.show()

+------+-------------+-------------+-----------+------------+-------------+-----------+----------+------------------+
|symbol|opening_price|closing_price|  adj_close|lowest_price|highest_price|     volume|      date|volume_in_millions|
+------+-------------+-------------+-----------+------------+-------------+-----------+----------+------------------+
|  MSFT|  0.088541664|  0.097222224| 0.07085974| 0.088541664|    0.1015625|1.0317888E9|1986-03-13|         1031.7888|
|  MSFT|  0.097222224|   0.10069445|  0.0733905| 0.097222224|   0.10243055|   3.0816E8|1986-03-14|            308.16|
|  MSFT|   0.10069445|   0.10243055| 0.07465584|  0.10069445|   0.10329861| 1.331712E8|1986-03-17|          133.1712|
|  MSFT|   0.10243055|   0.09982639|0.072757795| 0.098958336|   0.10329861|  6.77664E7|1986-03-18|           67.7664|
|  MSFT|   0.09982639|  0.098090276|0.071492456| 0.097222224|   0.10069445|  4.78944E7|1986-03-19|           47.8944|
|  MSFT|  0.098090276|   0.09548611|  0.0695944|  0.0946

In [None]:
database_url = "0.0.0.0:5432"


data_frame.write.format("jdbc").options(
    url= "jdbc:" + database_url,
    driver='org.postgresql.Driver',
     dbtable='pyspark_user',
      user='postgres',
      password='').mode('append').save()
)