<a href="https://colab.research.google.com/github/Michaelwwk/test/blob/main/03_Apache_Spark_Core_RDD_Operations_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Apache PySpark
In this demo we will see how we can run PySpark in a Google
 Colaboratory notebook. We will also perform some basic data exploratory tasks common to data science problems.



## PySpark Install

The first step involves installing pyspark.  The next step is to install findspark library.

*Note: the --ignore-install flag is used to ignore previous installations and use the latest one built alongside the allocated cluster.*


In [86]:
# install pyspark using pip
!pip install --ignore-install -q pyspark
# install findspark using pip
!pip install --ignore-install -q findspark

In [87]:
try:
  !git clone https://github.com/Michaelwwk/test.git
except:
  pass

fatal: destination path 'test' already exists and is not an empty directory.


In [88]:
import yfinance as yf
import pyarrow.csv as pv
import pyarrow.parquet as pq
import collections

from pyspark.sql import SparkSession
from google.colab import drive
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from datetime import datetime
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as func
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("Stock Ticker").config('spark.ui.port', '4050').getOrCreate()
# drive.mount('/content/drive')

someFile = "/content/test/wordcount.txt"
stock_folder = "/content/test/StockData"
stocks = spark.read.csv(stock_folder, header=True)
peopleDF = spark.read.json("/content/test/people.json")

df_csv = spark.read.format('csv') \
                .option("inferSchema","true") \
                .option("header","true") \
                .option("sep",";") \
                .load("/content/test/people.csv")

df_json = spark.read.format('json') \
                .option("inferSchema","true") \
                .option("header","true") \
                .option("sep",";") \
                .load("/content/test/people.json")

# read and convert hdb resale price
hdb_table = pv.read_csv("/content/test/resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.csv")
pq.write_table(hdb_table,'/content/test/resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.parquet')
hdb_parquet = pq.ParquetFile('/content/test/resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.parquet')

## Spark Session

We import the basic object SparkSession from the Spark Framework. In PySpark, a Spark Session is a unified entry point for reading data, configuring the system, and managing various Spark services.

Here's a breakdown of what the Spark Session does:

1. Unified Entry Point: It's the central point to access all Spark  functionalities, making it simpler and more intuitive to use Spark for development.
2. Data Reading and Writing: We use the Spark Session to read data from various sources (like HDFS, S3, JDBC, Hive, etc.) and write data to various sinks.
3. Configuration Management: It allows us to configure various aspects of the Spark application, such as setting configuration parameters.
4. Creating DataFrames and Datasets: The Spark Session provides methods to create DataFrames and Datasets, which are the core data structures in Spark.
5. Execution of SQL Queries: We can run SQL queries by using the Spark Session, especially when dealing with structured data.
6. Managing Spark Services: It also helps in managing underlying Spark services like SparkContext, and it's the main point of interaction when dealing with structured data.

In PySpark, a Spark Session is created using the SparkSession.builder method. Here's an example:

In [89]:
# import collections
# spark = SparkSession.builder.master("local").appName("My App ").getOrCreate()

In [90]:
spark.sparkContext

## Line Count
To count the number of lines from a file.

In [91]:
# the above file is under your pythonProject folder
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
print(spark.read.text(someFile).count())


20


## Mounting Google Drive
Connect to Google Drive

In [92]:
# to read in data from a text file, first upload the data file into your google drive and then mount your google drive onto colab
# to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True)
# drive.mount('/content/drive', force_remount=True)

## Actions and Transformation

In PySpark, operations on RDDs can be broadly classified into two categories: transformations and actions. Transformations create a new RDD from an existing one, while actions return a value after running a computation on the RDD. Below are simple examples demonstrating the use of transformations and actions.

###Transformations

####Map
Applies a function to each element and returns a new RDD.


In [93]:
rdd = spark.sparkContext.parallelize((1, 2, 3, 4, 5))
# Traditional Python map(function, collection) (few MBs - GB fails)
# Scalabale map (Peta - support)
# iterablecollection.map(function) -> Object
# collect() Object to collection
print(tuple(rdd.map(lambda x: x * x).collect()))

(1, 4, 9, 16, 25)


####Filter
Returns a new RDD containing only the elements that satisfy a condition.

In [94]:
print(rdd.filter(lambda x: x % 2 == 0).collect())  # Keeps even numbers


[2, 4]


####FlatMap
Similar to map, but each input item can be mapped to 0 or more output items.

In [95]:
words = spark.sparkContext.parallelize(["hello world", "hi", "hello mars", "hello jupiter", "hello saturn"])
print(words.flatMap(lambda x: x.split(" ")).collect())

['hello', 'world', 'hi', 'hello', 'mars', 'hello', 'jupiter', 'hello', 'saturn']


####Distinct
Returns a new RDD containing distinct elements from the original RDD.

In [96]:
print(words.flatMap(lambda x: x.split(" ")).distinct().collect())

['hello', 'world', 'hi', 'mars', 'jupiter', 'saturn']


In [97]:
tuples = spark.sparkContext.parallelize((1, 1, 2, 3, 3, 4))
print(tuples.distinct().collect())


[1, 2, 3, 4]


###Actions
####Collect
Returns all the elements of the RDD as an array to the driver program.

In [98]:
print(tuples.distinct().collect())

[1, 2, 3, 4]


####Count
Returns the number of elements in the RDD.

In [99]:
print(tuples.count())

6


####Take
Returns an array with the first n elements of the RDD.

In [100]:
first_three = tuples.take(3)
print(first_three)


[1, 1, 2]


####Reduce
Aggregates the elements of the RDD using a function.

In [101]:
sum = tuples.reduce(lambda a, b: a + b)
print(sum)


14


These examples illustrate basic operations in PySpark, allowing you to manipulate and analyze large datasets efficiently. To run these examples, ensure you have a SparkContext (sc) initialized in your PySpark environment.

### How to pretty print in PySaprk?

The take() function and iteration in PySpark will mimic the pretty print function, but use them wisely.

In [102]:
pprint()

Pretty printing has been turned ON


###Key Operations

PySpark examples for key based functions are groupByKey, reduceByKey, and sortByKey operations. Let us look at how they work.

####groupByKey
This operation groups the values for each key in the RDD into a single sequence.
####reduceByKey
This operation merges the values for each key using an associative reduce function.
####sortByKey
This operation sorts the dataset by keys.

Let us put together an example to compare and contrast


In [103]:
rdd = spark.sparkContext.parallelize([(3, 6),(1, 2),(3, 4)])
grouped = rdd.groupByKey()
for key, values in grouped.collect():
    print(f"{key}: {tuple(values)}")
reduced = rdd.reduceByKey(lambda a, b: a + b)
print(reduced.collect())
sorted_rdd = rdd.sortByKey()
print(sorted_rdd.collect())

3: (6, 4)
1: (2,)
[(3, 10), (1, 2)]
[(1, 2), (3, 6), (3, 4)]


Please note that these operations are transformations and require an action like collect to retrieve the data. Also, keep in mind that groupByKey can cause a lot of data shuffling over the network, and it's generally more efficient to use reduceByKey where possible because it combines output values locally before sending data over the network.

### Sampling

 The sample() transformation is used to sample a fraction of the data from an RDD. You can sample with or without replacement. Here's how you can use it:

####Sampling without replacement


In the sample method:

1. The first argument is withReplacement. Set it to False for sampling without replacement, meaning a particular customer can be chosen only once.
2. The second argument is the fraction of the data to sample, which is 0.1 in this case, meaning approximately 10% of the data.

This will output a random sample of the customers from your customers_rdd. The collect() action is used here for demonstration purposes, and it should be used with caution if the dataset is large, as it will gather all the sampled data to the driver node.

####Sampling with replacement

The following example shows how to use sample() with replacement. This means an element can be included in the sample multiple times.

These examples will give you an array of customers sampled from the original RDD. The actual elements in the sample will vary each time you run the code due to the randomness of the sampling process.

### More Transformations
In PySpark, you can perform various RDD operations such as union, join, and cartesian (cross) to combine data in different ways. Here are simple examples for each:
#### Union
The union operation combines two RDDs to form a new RDD that contains elements from both RDDs.

In [104]:
# Create two RDDs
rdd1 = spark.sparkContext.parallelize([("Alice", 1), ("Bob", 2)])
rdd2 = spark.sparkContext.parallelize([("Charlie", 3), ("David", 4)])

# Perform the union operation
union_rdd = rdd1.union(rdd2)

# Collect and print the results
print(union_rdd.collect())



[('Alice', 1), ('Bob', 2), ('Charlie', 3), ('David', 4)]


####Join
The join operation combines two RDDs based on their key.

In [105]:

# Create two RDDs with common keys
rdd3 = spark.sparkContext.parallelize([("Alice", "Apple"), ("Bob", "Banana")])
rdd4 = spark.sparkContext.parallelize([("Alice", 1), ("Bob", 2)])

# Perform the join operation
join_rdd = rdd3.join(rdd4)

# Collect and print the results
print(join_rdd.collect())


[('Alice', ('Apple', 1)), ('Bob', ('Banana', 2))]


####Cross or Catesian
The cartesian operation returns all possible pairs of (a, b) where a is in the first RDD and b is in the second RDD.

In [106]:
# Create two RDDs
rdd5 = spark.sparkContext.parallelize([1, 2])
rdd6 = spark.sparkContext.parallelize(["a", "b"])

# Perform the cartesian operation
cross_rdd = rdd5.cartesian(rdd6)

# Collect and print the results
print(cross_rdd.collect())

[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]


Please note that the cartesian operation can be very expensive in terms of computation and memory usage, especially with large datasets, because it forms all possible combinations of elements between the two RDDs.

### More Actions

####save
Saving an RDD in PySpark can be done in a variety of formats. Common formats include saving as text files, sequence files, or other file-based data sources. Below are examples of how to save an RDD that contains customer data as a text file.

But we will see about this action after the NoSQL lecture.

End of Demo

Thank you for the patient listening. 🙏🌞

<a href="https://colab.research.google.com/github/suriarasai/BEAD2024/blob/main/colab/04_Stock_Price_Analysis_using_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Stock Price Analysis using Spark
Stock price prediction is a vital part of financial analysis, and it can be a daunting task to obtain and process data to make accurate predictions. Apache Spark can be a powerful tool in building a data pipeline for stock price prediction.

The goal of this case study is to utilise Apache Spark to analyse stock price data and gain insights into the trends in the company’s stock values over time.

## Ingestion and Cleansing
Data was collected from Yahoo Finance App.

https://finance.yahoo.com/lookup

The data for this case study is publicly available on Yahoo Finance, and it includes information about a company’s daily stock values from 2023.

We will load the data shared v in My Drive.

In [107]:
# # to read in data from a text file, first upload the data file into your google drive and then mount your google drive onto colab
# from google.colab import drive
# # to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True)
# drive.mount('/content/drive', force_remount=True)

Let us name the location for the stock files in the folder name.

## PySpark and Spark Session

The first step involves installing pyspark.  The next step is to install findspark library.

*Note: the --ignore-install flag is used to ignore previous installations and use the latest one built alongside the allocated cluster.*

In [108]:
# # install pyspark using pip
# !pip install --ignore-install -q pyspark
# # install findspark using pip
# !pip install --ignore-install -q findspark

The next step is to create an active spark session object.

In [109]:
# from pyspark.sql import SparkSession
# import yfinance as yf
# stock_folder = "/content/drive/MyDrive/data/StockData"
# # stock_prices_data.to_csv("aapl_stock_prices_data.csv")

# from pyspark.sql.functions import udf
# from pyspark.sql.types import DateType
# from datetime import datetime

# from pyspark.sql.types import FloatType
# from pyspark.sql.types import IntegerType
# import pyspark.sql.functions as func
# from pyspark.sql.window import Window

# ## Reading CSV data => Stocks
# stocks = spark.read.csv(stock_folder, header=True)

# # import collections
# spark = SparkSession.builder.master("local").appName("Stock Ticker").config('spark.ui.port', '4050').getOrCreate()

Let us look at how to collect stock ticker data into  files. There are different data sources for obtaining stock data, such as Yahoo Finance, Google Finance, and Alpha Vantage.

We will use Yahoo Financce. This code downloads Apple Inc. (AAPL) daily stock prices from January 1, 2020 to December 28, 2023 and saves the data in a CSV file called “aapl_stock_prices_data.csv.”

We can change the code to obtain data for a different firm or time period.

In [110]:
# Imports

ticker = "AAPL"  # Replace with the ticker symbol of the company you want to analyze
start_date = "2020-01-01"
end_date = "2023-12-28"

stock_prices_data = yf.download(ticker, start=start_date, end=end_date)

stock_prices_data.to_csv("aapl_stock_prices_data.csv")

[*********************100%%**********************]  1 of 1 completed


Now, let us load all the stock files for this tutorial into a Dataframe by just pointing the folder name.

Let us take a look at the data.

In [111]:
## Seeing Data => Dataframe
stocks.show(5)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
+------+----------+----------+-------+--------+--------+--------+
only showing top 5 rows



In [112]:
## Seeing Schema of the Data => Data Types in Dataframe
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [113]:
## Basic select operation => Select Ticker, Date and Close price
stocks.select("Ticker").show(3)

+------+
|Ticker|
+------+
| BRK-B|
| BRK-B|
| BRK-B|
+------+
only showing top 3 rows



In [114]:
stocks.select(["Ticker", "Date", "Open"]).show(5)

+------+----------+--------+
|Ticker|      Date|    Open|
+------+----------+--------+
| BRK-B|05/31/2023|$321.12 |
| BRK-B|05/30/2023|$321.86 |
| BRK-B|05/26/2023|$320.44 |
| BRK-B|05/25/2023|$320.56 |
| BRK-B|05/24/2023|$322.71 |
+------+----------+--------+
only showing top 5 rows



In [115]:
## Filtering Data => Select rows containing Microsoft Stock in last one month
stocks.filter(stocks.Ticker == "MSFT").show(10)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|  MSFT|05/30/2023|  $331.21 |29503070|$335.23 |$335.74 |$330.52 |
|  MSFT|05/26/2023|  $332.89 |36630630|$324.02 |$333.40 |$323.88 |
|  MSFT|05/25/2023|  $325.92 |43301740|$323.24 |$326.90 |$320.00 |
|  MSFT|05/24/2023|  $313.85 |23384890|$314.73 |$316.50 |$312.61 |
|  MSFT|05/23/2023|  $315.26 |30797170|$320.03 |$322.72 |$315.25 |
|  MSFT|05/22/2023|  $321.18 |24115660|$318.60 |$322.59 |$318.01 |
|  MSFT|05/19/2023|  $318.34 |27546700|$316.74 |$318.75 |$316.37 |
|  MSFT|05/18/2023|  $318.52 |27275990|$314.53 |$319.04 |$313.72 |
|  MSFT|05/17/2023|  $314.00 |24315010|$312.29 |$314.43 |$310.74 |
+------+----------+----------+--------+--------+--------+--------+
only showing top 10 rows



In [116]:
stocks.filter((stocks.Ticker == "MSFT") & (stocks.Date == "05/31/2023")).show()

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
+------+----------+----------+--------+--------+--------+--------+



In [117]:
stocks.filter(((stocks.Ticker == "MSFT") | (stocks.Ticker == "V")) & (stocks.Date == "05/31/2023")).show()

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|     V|05/31/2023|  $221.03 |20460620|$219.96 |$221.53 |$216.14 |
+------+----------+----------+--------+--------+--------+--------+



In [118]:
stocks.filter((stocks.Ticker.isin(["MSFT", "QQQ", "SPY", "V", "TSLA"])) & (stocks.Date == "05/31/2023")).show()

+------+----------+----------+---------+--------+--------+--------+
|Ticker|      Date|Close/Last|   Volume|    Open|    High|     Low|
+------+----------+----------+---------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 | 45950550|$332.29 |$335.94 |$327.33 |
|  TSLA|05/31/2023|  $203.93 |150711700|$199.78 |$203.95 |$195.12 |
|     V|05/31/2023|  $221.03 | 20460620|$219.96 |$221.53 |$216.14 |
|   SPY|05/31/2023|    417.85|110811800|  418.28|  419.22|  416.22|
|   QQQ|05/31/2023|    347.99| 65105380|  348.37|   350.6|  346.51|
+------+----------+----------+---------+--------+--------+--------+



In [119]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



#### Defining the User-Defined Function (UDF):

1. date_parser: This is a UDF that takes a date string and converts it to a DateType. It's defined using Python's lambda function.
2. lambda date: datetime.strptime(date, "%m/%d/%Y"): The lambda function takes a date string as input, and uses datetime.strptime to parse it into a datetime object based on the specified format ("%m/%d/%Y"). This format represents month/day/year, e.g., "12/31/2023".
3. DateType(): The UDF is registered with a return type of DateType, meaning it will convert the output of the datetime.strptime function into a PySpark DateType.

Applying the UDF to a DataFrame:
1. stocks = stocks.withColumn("ParsedDate", date_parser(stocks.Date)): This line applies the date_parser UDF to the Date column of the stocks DataFrame. It creates a new column, ParsedDate, which contains the parsed dates.
2. withColumn: This method is used to add a new column or replace an existing column in a DataFrame. The first argument is the name of the new column, and the second argument is the column itself (in this case, generated by applying the UDF).

In [120]:
## User Defined Functions

date_parser = udf(lambda date: datetime.strptime(date,"%m/%d/%Y"), DateType())
stocks = stocks.withColumn("ParsedDate", date_parser(stocks.Date))
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- ParsedDate: date (nullable = true)



The UDF parser_number is used in DataFrame operations to process columns. For instance, if there is a DataFrame column with monetary values stored as strings like $100, parser_number can be used to convert these strings into floating-point numbers for further numerical processing.


  The function num_parser is designed to take an input value and perform certain operations based on its type.
  if isinstance(value, str): This checks if the value is a string. If so, it assumes the string represents a number with a dollar sign (e.g., "$100") and tries to convert it to a floating-point number.

    value.strip("$"): This removes the dollar sign from the string.
    float(...): Converts the cleaned string to a floating-point number.
  
  elif isinstance(value, int) or isinstance(value, float): This checks if the value is already an integer or floating-point number. If so, it just returns the value as it is.
  
  else: If the value is neither a string, nor an int, nor a float, the function returns None, indicating an inability to process the input.

In [121]:
def num_parser(value):
    if isinstance(value, str):
        return float(value.strip("$"))
    elif isinstance(value, int) or isinstance(value, float):
        return value
    else:
        return None

parser_number = udf(num_parser, FloatType())

In [122]:
stocks = (stocks.withColumn("Open", parser_number(stocks.Open))
                .withColumn("Close", parser_number(stocks["Close/Last"]))
                .withColumn("Low", parser_number(stocks.Low))
                .withColumn("High", parser_number(stocks.High)))

In [123]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [124]:
parse_int = udf(lambda value: int(value), IntegerType())
## Changing the datatype of the column
stocks = stocks.withColumn("Volume", parse_int(stocks.Volume))

In [125]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [126]:
cleaned_stocks = stocks.select(["Ticker", "ParsedDate", "Volume", "Open", "Low", "High", "Close"])
cleaned_stocks.show(5)

+------+----------+-------+------+------+------+------+
|Ticker|ParsedDate| Volume|  Open|   Low|  High| Close|
+------+----------+-------+------+------+------+------+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|321.08|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|322.19|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63| 320.6|
| BRK-B|2023-05-25|4251935|320.56|317.71|320.56|319.02|
| BRK-B|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|
+------+----------+-------+------+------+------+------+
only showing top 5 rows



In [127]:
## Calculating basic stastics about data => Calculate average stock price
cleaned_stocks.describe(["Volume", "Open", "Low", "High", "Close"]).show()

+-------+--------------------+------------------+------------------+------------------+------------------+
|summary|              Volume|              Open|               Low|              High|             Close|
+-------+--------------------+------------------+------------------+------------------+------------------+
|  count|               15108|             15108|             15108|             15108|             15108|
|   mean|5.1868408793685466E7|180.09656566181036| 177.9982781513109| 182.1253348687101| 180.1256089860054|
| stddev| 5.496484129953461E7|101.16125813324406|100.26590135955226|101.96625521621772|101.14891782168523|
|    min|              961133|             12.07|              11.8|             12.45|             11.93|
|    max|           914080943|            479.22|            476.06|            479.98|            477.71|
+-------+--------------------+------------------+------------------+------------------+------------------+



## Basic Stock Analysis

In [128]:
## Calculate maximum stock price for various stocks
cleaned_stocks.groupBy("Ticker").max("Open").show(15)

+------+---------+
|Ticker|max(Open)|
+------+---------+
| BRK-B|   361.39|
|   TSM|   141.61|
|  AAPL|   182.63|
|  META|   381.68|
|  TSLA|   411.47|
|   QQQ|   405.57|
|     V|   250.05|
| GOOGL|   151.25|
|   SPY|   479.22|
|  AMZN|    187.2|
|  MSFT|   344.62|
|  NVDA|   405.95|
+------+---------+



In [129]:

cleaned_stocks.groupBy("Ticker").max("Open").withColumnRenamed("max(Open)", "MaxStockPrice").show(15)

+------+-------------+
|Ticker|MaxStockPrice|
+------+-------------+
| BRK-B|       361.39|
|   TSM|       141.61|
|  AAPL|       182.63|
|  META|       381.68|
|  TSLA|       411.47|
|   QQQ|       405.57|
|     V|       250.05|
| GOOGL|       151.25|
|   SPY|       479.22|
|  AMZN|        187.2|
|  MSFT|       344.62|
|  NVDA|       405.95|
+------+-------------+



In [130]:
cleaned_stocks.groupBy("Ticker").agg(func.max("Open").alias("MaxStockPrice")).show(15)

+------+-------------+
|Ticker|MaxStockPrice|
+------+-------------+
| BRK-B|       361.39|
|   TSM|       141.61|
|  AAPL|       182.63|
|  META|       381.68|
|  TSLA|       411.47|
|   QQQ|       405.57|
|     V|       250.05|
| GOOGL|       151.25|
|   SPY|       479.22|
|  AMZN|        187.2|
|  MSFT|       344.62|
|  NVDA|       405.95|
+------+-------------+



In [131]:
cleaned_stocks.groupBy("Ticker").agg(
    func.max("Open").alias("MaxStockPrice"),
    func.sum("Volume").alias("TotalVolume")
).show(15)

+------+-------------+------------+
|Ticker|MaxStockPrice| TotalVolume|
+------+-------------+------------+
| BRK-B|       361.39|  5862401321|
|   TSM|       141.61| 12506470104|
|  AAPL|       182.63|139310061360|
|  META|       381.68| 30148848043|
|  TSLA|       411.47|171802975076|
|   QQQ|       405.57| 60437153773|
|     V|       250.05| 10410997871|
| GOOGL|       151.25| 43956560981|
|   SPY|       479.22|107925285300|
|  AMZN|        187.2|104503287430|
|  MSFT|       344.62| 37976660472|
|  NVDA|       405.95| 58787218324|
+------+-------------+------------+



The code starts with cleaned_stocks, which is a DataFrame that has already undergone preprocessing, including the parsing of dates.

By extracting the year, month, day, and week information from the date, the DataFrame is enhanced for more detailed analysis. For instance, you can now easily group or filter the data based on these time components.

This kind of manipulation is particularly useful in time series analysis, such as calculating the maximum price of stocks in each year, analyzing monthly trends, or understanding weekly patterns.

In [132]:
## Calculate maximum price of stocks each year => Basic date manipulation operation
cleaned_stocks = (cleaned_stocks.withColumn("Year", func.year(cleaned_stocks.ParsedDate))
                                .withColumn("Month", func.month(cleaned_stocks.ParsedDate))
                                .withColumn("Day", func.dayofmonth(cleaned_stocks.ParsedDate))
                                .withColumn("Week", func.weekofyear(cleaned_stocks.ParsedDate))
                 )
cleaned_stocks.show(10)

+------+----------+-------+------+------+------+------+----+-----+---+----+
|Ticker|ParsedDate| Volume|  Open|   Low|  High| Close|Year|Month|Day|Week|
+------+----------+-------+------+------+------+------+----+-----+---+----+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|321.08|2023|    5| 31|  22|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|322.19|2023|    5| 30|  22|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63| 320.6|2023|    5| 26|  21|
| BRK-B|2023-05-25|4251935|320.56|317.71|320.56|319.02|2023|    5| 25|  21|
| BRK-B|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|2023|    5| 24|  21|
| BRK-B|2023-05-23|4031342|328.19|322.97|329.27|323.11|2023|    5| 23|  21|
| BRK-B|2023-05-22|2763422|330.75|328.35|331.49|329.13|2023|    5| 22|  21|
| BRK-B|2023-05-19|4323538| 331.0|329.12|333.94|330.39|2023|    5| 19|  20|
| BRK-B|2023-05-18|2808329|326.87|325.85|329.98|329.76|2023|    5| 18|  20|
| BRK-B|2023-05-17|3047626|325.02|324.82|328.26|327.39|2023|    5| 17|  20|
+------+----

In [133]:
## Calculate average stock price for stock each month
monthly = cleaned_stocks.groupBy(['Ticker', 'Year', 'Month']).agg(func.max("Open").alias("MonthHigh"), func.min("Open").alias("MonthLow"))
weekly = cleaned_stocks.groupBy(['Ticker', 'Year', 'Week']).agg(func.max("Open").alias("WeekHigh"), func.min("Open").alias("WeekLow"))
monthly.show()

+------+----+-----+---------+--------+
|Ticker|Year|Month|MonthHigh|MonthLow|
+------+----+-----+---------+--------+
| BRK-B|2022|   10|   297.98|  260.58|
|  META|2020|    6|   241.28|  209.75|
| BRK-B|2018|    9|   222.13|  209.21|
|  MSFT|2022|    6|    275.2|  243.86|
|  MSFT|2021|    2|   245.03|  230.01|
|     V|2020|    7|    200.0|   189.5|
|   SPY|2019|    6|   296.04|  275.31|
|   QQQ|2019|    6|   189.72|  171.98|
|   QQQ|2019|    2|    174.2|  166.73|
|  MSFT|2020|    1|   174.05|  157.08|
| BRK-B|2021|   10|   290.85|  273.02|
| BRK-B|2020|   10|   216.74|  200.03|
|  TSLA|2023|    4|   199.91|  152.64|
|  TSLA|2019|    4|    19.22|   15.72|
|  AMZN|2018|    5|    81.15|   81.15|
|     V|2020|    4|   181.78|  152.53|
| GOOGL|2018|    5|     54.1|    54.1|
|  NVDA|2021|    3|    139.0|   121.3|
|  NVDA|2019|    6|    41.25|   33.98|
|     V|2018|    7|   143.08|  131.96|
+------+----+-----+---------+--------+
only showing top 20 rows



In [134]:
weekly.show()

+------+----+----+--------+-------+
|Ticker|Year|Week|WeekHigh|WeekLow|
+------+----+----+--------+-------+
| BRK-B|2022|  14|   352.0| 341.17|
| BRK-B|2022|  10|  326.59| 322.49|
| BRK-B|2021|  14|  264.22| 260.02|
|  META|2022|  43|  131.68|  97.98|
|  META|2020|   6|  212.51| 203.44|
|  TSLA|2022|  20|  255.72| 235.67|
|  TSLA|2020|  19|   52.92|  46.73|
|  TSLA|2020|  16|   51.49|  39.34|
|  TSLA|2018|  39|   20.86|  18.02|
| GOOGL|2021|  29|  130.43| 125.53|
|  NVDA|2020|  37|  129.89| 117.35|
|   SPY|2022|  42|  375.13| 364.01|
|   SPY|2021|  52|  477.93| 472.06|
|   QQQ|2022|  34|  320.28| 313.61|
|   QQQ|2022|  29|  306.43| 293.11|
|   QQQ|2021|  45|  399.16| 391.77|
| BRK-B|2018|  48|  217.23|  209.3|
|  MSFT|2022|   6|  309.87| 301.25|
|  MSFT|2021|   2|  218.47| 213.52|
|  META|2022|  40|  140.49| 136.76|
+------+----+----+--------+-------+
only showing top 20 rows



In [135]:
weekly.withColumn("Spread", weekly['WeekHigh'] - weekly['WeekLow']).show()

+------+----+----+--------+-------+---------+
|Ticker|Year|Week|WeekHigh|WeekLow|   Spread|
+------+----+----+--------+-------+---------+
| BRK-B|2022|  14|   352.0| 341.17|10.829987|
| BRK-B|2022|  10|  326.59| 322.49| 4.100006|
| BRK-B|2021|  14|  264.22| 260.02| 4.200012|
|  META|2022|  43|  131.68|  97.98| 33.69999|
|  META|2020|   6|  212.51| 203.44| 9.069992|
|  TSLA|2022|  20|  255.72| 235.67|20.050003|
|  TSLA|2020|  19|   52.92|  46.73|6.1899986|
|  TSLA|2020|  16|   51.49|  39.34|12.150002|
|  TSLA|2018|  39|   20.86|  18.02|2.8400002|
| GOOGL|2021|  29|  130.43| 125.53| 4.899994|
|  NVDA|2020|  37|  129.89| 117.35|12.540001|
|   SPY|2022|  42|  375.13| 364.01|11.119995|
|   SPY|2021|  52|  477.93| 472.06| 5.869995|
|   QQQ|2022|  34|  320.28| 313.61|6.6700134|
|   QQQ|2022|  29|  306.43| 293.11|13.320007|
|   QQQ|2021|  45|  399.16| 391.77|7.3900146|
| BRK-B|2018|  48|  217.23|  209.3|7.9299927|
|  MSFT|2022|   6|  309.87| 301.25| 8.619995|
|  MSFT|2021|   2|  218.47| 213.52

In [136]:
yearly = cleaned_stocks.groupBy(['Ticker', 'Year']).agg(func.max("Open").alias("YearlHigh"), func.min("Open").alias("YearlyLow"))
yearly.show()

+------+----+---------+---------+
|Ticker|Year|YearlHigh|YearlyLow|
+------+----+---------+---------+
|  NVDA|2020|   147.04|    50.02|
|     V|2022|   233.04|    175.0|
|  META|2020|   300.16|   139.75|
| BRK-B|2023|    331.0|   294.68|
|   TSM|2018|     45.0|    35.33|
|   TSM|2021|   141.61|    108.0|
|   SPY|2022|   479.22|  349.205|
|  MSFT|2019|   159.45|    99.55|
|  MSFT|2021|   344.62|   212.17|
| BRK-B|2018|    224.0|   185.43|
|  META|2021|   381.68|    247.9|
|  TSLA|2019|     29.0|    12.07|
|   QQQ|2020|   314.16|   170.92|
|  META|2018|   215.72|    123.1|
|  AAPL|2020|   138.05|    57.02|
|  MSFT|2020|   229.27|   137.01|
|  TSLA|2021|   411.47|   184.18|
|     V|2021|   250.05|    192.0|
|  NVDA|2018|    72.33|    31.62|
|   SPY|2020|   373.81|   228.19|
+------+----+---------+---------+
only showing top 20 rows



## Join


The code snippet you provided demonstrates the use of a join operation, which combines two DataFrames based on specified conditions. Let's dissect this particular join operation:

*   cleaned_stocks: This is a  DataFrame that contains stock-related data. It has columns such as Ticker, Year, and other stock-related information.
*   yearly: This is another DataFrame. It contains yearly data related to stocks and also includes Ticker and Year columns for reference.

An inner join will result in a DataFrame that only includes rows where there is a match in both cleaned_stocks and yearly DataFrames based on the specified conditions.

This type of join is useful in scenarios where you need to combine data from two different sources based on common identifiers. In this case, it's combining stock information based on matching stock tickers and years.

In [137]:
# Joins
cleaned_stocks.join(yearly, (cleaned_stocks.Ticker==yearly.Ticker) & (cleaned_stocks.Year == yearly.Year),
                                      'inner'
                                     ).show()

+------+----------+-------+------+------+------+------+----+-----+---+----+------+----+---------+---------+
|Ticker|ParsedDate| Volume|  Open|   Low|  High| Close|Year|Month|Day|Week|Ticker|Year|YearlHigh|YearlyLow|
+------+----------+-------+------+------+------+------+----+-----+---+----+------+----+---------+---------+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|321.08|2023|    5| 31|  22| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|322.19|2023|    5| 30|  22| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63| 320.6|2023|    5| 26|  21| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-25|4251935|320.56|317.71|320.56|319.02|2023|    5| 25|  21| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|2023|    5| 24|  21| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-23|4031342|328.19|322.97|329.27|323.11|2023|    5| 23|  21| BRK-B|2023|    331.0|   294.68|
| BRK-B|2023-05-22|2763422|3

In [138]:
cleaned_stocks.join(yearly,
                    (cleaned_stocks.Ticker==yearly.Ticker) & (cleaned_stocks.Year == yearly.Year),
                    'inner'
                   ).drop(yearly.Year, yearly.Ticker).show()

+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
|ParsedDate| Volume|  Open|   Low|  High| Close|Month|Day|Week|Ticker|Year|YearlHigh|YearlyLow|
+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
|2023-05-31|6175417|321.12|319.39|322.41|321.08|    5| 31|  22| BRK-B|2023|    331.0|   294.68|
|2023-05-30|3232461|321.86| 319.0|322.47|322.19|    5| 30|  22| BRK-B|2023|    331.0|   294.68|
|2023-05-26|3229873|320.44|319.67|322.63| 320.6|    5| 26|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-25|4251935|320.56|317.71|320.56|319.02|    5| 25|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|    5| 24|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-23|4031342|328.19|322.97|329.27|323.11|    5| 23|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-22|2763422|330.75|328.35|331.49|329.13|    5| 22|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-19|4323538| 331.0|329.12|333.94

In [139]:
historic_stocks = cleaned_stocks.join(yearly,
                    (cleaned_stocks.Ticker==yearly.Ticker) & (cleaned_stocks.Year == yearly.Year),
                    'inner'
                   ).drop(yearly.Year, yearly.Ticker)

In [140]:
historic_stocks.show(5)

+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
|ParsedDate| Volume|  Open|   Low|  High| Close|Month|Day|Week|Ticker|Year|YearlHigh|YearlyLow|
+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
|2023-05-31|6175417|321.12|319.39|322.41|321.08|    5| 31|  22| BRK-B|2023|    331.0|   294.68|
|2023-05-30|3232461|321.86| 319.0|322.47|322.19|    5| 30|  22| BRK-B|2023|    331.0|   294.68|
|2023-05-26|3229873|320.44|319.67|322.63| 320.6|    5| 26|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-25|4251935|320.56|317.71|320.56|319.02|    5| 25|  21| BRK-B|2023|    331.0|   294.68|
|2023-05-24|3075393|322.71|319.56| 323.0| 320.2|    5| 24|  21| BRK-B|2023|    331.0|   294.68|
+----------+-------+------+------+------+------+-----+---+----+------+----+---------+---------+
only showing top 5 rows



## Advanced Analysis
In essence, this code demonstrates how create a new column in the snapshot DataFrame that contains the previous day's opening price for each stock. This is done using a window function that partitions the data by stock ticker and orders it by date. The lag function is then applied to get the opening price from the previous day within each partition. This kind of operation is common in time series analysis, particularly with financial data.

In [141]:
snapshot = cleaned_stocks.select(['Ticker', 'ParsedDate', 'Open'])
snapshot.show()

+------+----------+------+
|Ticker|ParsedDate|  Open|
+------+----------+------+
| BRK-B|2023-05-31|321.12|
| BRK-B|2023-05-30|321.86|
| BRK-B|2023-05-26|320.44|
| BRK-B|2023-05-25|320.56|
| BRK-B|2023-05-24|322.71|
| BRK-B|2023-05-23|328.19|
| BRK-B|2023-05-22|330.75|
| BRK-B|2023-05-19| 331.0|
| BRK-B|2023-05-18|326.87|
| BRK-B|2023-05-17|325.02|
| BRK-B|2023-05-16|322.46|
| BRK-B|2023-05-15|322.89|
| BRK-B|2023-05-12|323.82|
| BRK-B|2023-05-11| 321.0|
| BRK-B|2023-05-10|326.08|
| BRK-B|2023-05-09|324.87|
| BRK-B|2023-05-08|328.26|
| BRK-B|2023-05-05|323.36|
| BRK-B|2023-05-04|323.44|
| BRK-B|2023-05-03|327.13|
+------+----------+------+
only showing top 20 rows



Window functions are used to perform calculations across a set of rows that are somehow related to the current row, such as rows within the same group.

Window partitionBy Ticker: This partitions the data by the Ticker column. In the context of stock data, each Ticker represents a different stock, so partitioning by Ticker means that each stock will be treated separately. ParsedDate is the column representing the date associated with each record.

The lag function is used to access data from a previous row. In this case, it's accessing the value from the Open column in the row preceding the current row (lag of 1). The Open column likely represents the opening price of a stock.


In [142]:
lag1Day = Window.partitionBy("Ticker").orderBy("ParsedDate")
snapshot.withColumn("PreviousOpen", func.lag("Open", 1).over(lag1Day)).show()


+------+----------+-----+------------+
|Ticker|ParsedDate| Open|PreviousOpen|
+------+----------+-----+------------+
|  AAPL|2018-05-31|46.81|        NULL|
|  AAPL|2018-06-01| 47.0|       46.81|
|  AAPL|2018-06-04|47.91|        47.0|
|  AAPL|2018-06-05|48.27|       47.91|
|  AAPL|2018-06-06|48.41|       48.27|
|  AAPL|2018-06-07|48.54|       48.41|
|  AAPL|2018-06-08|47.79|       48.54|
|  AAPL|2018-06-11|47.84|       47.79|
|  AAPL|2018-06-12|47.85|       47.84|
|  AAPL|2018-06-13|48.11|       47.85|
|  AAPL|2018-06-14|47.89|       48.11|
|  AAPL|2018-06-15|47.51|       47.89|
|  AAPL|2018-06-18|46.97|       47.51|
|  AAPL|2018-06-19|46.29|       46.97|
|  AAPL|2018-06-20|46.59|       46.29|
|  AAPL|2018-06-21|46.81|       46.59|
|  AAPL|2018-06-22|46.53|       46.81|
|  AAPL|2018-06-25|45.85|       46.53|
|  AAPL|2018-06-26|45.75|       45.85|
|  AAPL|2018-06-27|46.31|       45.75|
+------+----------+-----+------------+
only showing top 20 rows



In summary, this code adds a new column to the snapshot DataFrame that contains a 51-period moving average of the opening stock prices, calculated separately for each stock based on its ticker. This is a typical analysis technique in financial data processing, especially useful for identifying trends in stock prices.

In [143]:

## Calculate moving average
movingAverage = Window.partitionBy("Ticker").orderBy("ParsedDate").rowsBetween(-50, 0)
(snapshot.withColumn("MA50", func.avg("Open").over(movingAverage))
         .withColumn("MA50", func.round("MA50", 2))).show()

+------+----------+-----+-----+
|Ticker|ParsedDate| Open| MA50|
+------+----------+-----+-----+
|  AAPL|2018-05-31|46.81|46.81|
|  AAPL|2018-06-01| 47.0|46.91|
|  AAPL|2018-06-04|47.91|47.24|
|  AAPL|2018-06-05|48.27| 47.5|
|  AAPL|2018-06-06|48.41|47.68|
|  AAPL|2018-06-07|48.54|47.82|
|  AAPL|2018-06-08|47.79|47.82|
|  AAPL|2018-06-11|47.84|47.82|
|  AAPL|2018-06-12|47.85|47.82|
|  AAPL|2018-06-13|48.11|47.85|
|  AAPL|2018-06-14|47.89|47.86|
|  AAPL|2018-06-15|47.51|47.83|
|  AAPL|2018-06-18|46.97|47.76|
|  AAPL|2018-06-19|46.29|47.66|
|  AAPL|2018-06-20|46.59|47.59|
|  AAPL|2018-06-21|46.81|47.54|
|  AAPL|2018-06-22|46.53|47.48|
|  AAPL|2018-06-25|45.85|47.39|
|  AAPL|2018-06-26|45.75| 47.3|
|  AAPL|2018-06-27|46.31|47.25|
+------+----------+-----+-----+
only showing top 20 rows



In [144]:
maximumStock = Window.partitionBy("Ticker").orderBy(snapshot.Open.desc())
snapshot.withColumn("MaxOpen", func.row_number().over(maximumStock)).show()

+------+----------+------+-------+
|Ticker|ParsedDate|  Open|MaxOpen|
+------+----------+------+-------+
|  AAPL|2022-01-04|182.63|      1|
|  AAPL|2021-12-13|181.12|      2|
|  AAPL|2021-12-28|180.16|      3|
|  AAPL|2022-01-05|179.61|      4|
|  AAPL|2021-12-30|179.47|      5|
|  AAPL|2021-12-29|179.33|      6|
|  AAPL|2021-12-16|179.28|      7|
|  AAPL|2022-03-30|178.55|      8|
|  AAPL|2021-12-31|178.09|      9|
|  AAPL|2022-03-31|177.84|     10|
|  AAPL|2022-01-03|177.83|     11|
|  AAPL|2022-04-05| 177.5|     12|
|  AAPL|2023-05-31|177.33|     13|
|  AAPL|2021-12-27|177.09|     14|
|  AAPL|2023-05-30|176.96|     15|
|  AAPL|2022-03-29|176.69|     16|
|  AAPL|2023-05-19|176.39|     17|
|  AAPL|2022-01-12|176.12|     18|
|  AAPL|2022-02-09|176.05|     19|
|  AAPL|2021-12-23|175.85|     20|
+------+----------+------+-------+
only showing top 20 rows



In [145]:
## Calculate top 5 highest close price for each stock in a year
snapshot.withColumn("MaxOpen", func.row_number().over(maximumStock)).filter("MaxOpen<=5").show()

+------+----------+------+-------+
|Ticker|ParsedDate|  Open|MaxOpen|
+------+----------+------+-------+
|  AAPL|2022-01-04|182.63|      1|
|  AAPL|2021-12-13|181.12|      2|
|  AAPL|2021-12-28|180.16|      3|
|  AAPL|2022-01-05|179.61|      4|
|  AAPL|2021-12-30|179.47|      5|
|  AMZN|2021-07-12| 187.2|      1|
|  AMZN|2021-07-09|186.13|      2|
|  AMZN|2021-07-07|185.87|      3|
|  AMZN|2021-11-19|185.63|      4|
|  AMZN|2021-07-14|185.44|      5|
| BRK-B|2022-03-29|361.39|      1|
| BRK-B|2022-03-28|360.59|      2|
| BRK-B|2022-03-31| 359.0|      3|
| BRK-B|2022-03-30|354.66|      4|
| BRK-B|2022-03-25| 353.9|      5|
| GOOGL|2022-02-02|151.25|      1|
| GOOGL|2021-11-19|149.98|      2|
| GOOGL|2021-11-08|149.83|      3|
| GOOGL|2021-11-22|149.33|      4|
| GOOGL|2021-11-09|149.23|      5|
+------+----------+------+-------+
only showing top 20 rows



In [146]:
result = snapshot.withColumn("MaxOpen", func.row_number().over(maximumStock)).filter("MaxOpen<=5")
moving_avg = (snapshot.withColumn("MA50", func.avg("Open").over(movingAverage))
         .withColumn("MA50", func.round("MA50", 2)))
moving_avg.show()

+------+----------+-----+-----+
|Ticker|ParsedDate| Open| MA50|
+------+----------+-----+-----+
|  AAPL|2018-05-31|46.81|46.81|
|  AAPL|2018-06-01| 47.0|46.91|
|  AAPL|2018-06-04|47.91|47.24|
|  AAPL|2018-06-05|48.27| 47.5|
|  AAPL|2018-06-06|48.41|47.68|
|  AAPL|2018-06-07|48.54|47.82|
|  AAPL|2018-06-08|47.79|47.82|
|  AAPL|2018-06-11|47.84|47.82|
|  AAPL|2018-06-12|47.85|47.82|
|  AAPL|2018-06-13|48.11|47.85|
|  AAPL|2018-06-14|47.89|47.86|
|  AAPL|2018-06-15|47.51|47.83|
|  AAPL|2018-06-18|46.97|47.76|
|  AAPL|2018-06-19|46.29|47.66|
|  AAPL|2018-06-20|46.59|47.59|
|  AAPL|2018-06-21|46.81|47.54|
|  AAPL|2018-06-22|46.53|47.48|
|  AAPL|2018-06-25|45.85|47.39|
|  AAPL|2018-06-26|45.75| 47.3|
|  AAPL|2018-06-27|46.31|47.25|
+------+----------+-----+-----+
only showing top 20 rows



<a href="https://colab.research.google.com/github/suriarasai/BEAD2024/blob/main/colab/06_Ingestion_and_Data_Formats.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction
PySpark provides interface used to load DataFrame from external storage systems. We will learn how to read different data format files into DataFrame and write DataFrame back to different data format files using PySpark examples. Lastly, we will learn how to transfer data between JVM and Python processes using Apache Arrow efficiently.

In [147]:

# # #from pyspark import SparkConf,SparkContext
# # from pyspark.sql import SparkSession
# # import collections
# # spark = SparkSession.builder.master("local").appName("Ingestion").config('spark.ui.port', '4050').getOrCreate()

# df_csv = spark.read.format('csv') \
#                 .option("inferSchema","true") \
#                 .option("header","true") \
#                 .option("sep",";") \
#                 .load("/content/drive/MyDrive/data/DataFormat/people.csv")

# df_json = spark.read.format('json') \
#                 .option("inferSchema","true") \
#                 .option("header","true") \
#                 .option("sep",";") \
#                 .load("/content/drive/MyDrive/data/DataFormat/people.json")

# peopleDF = spark.read.json("/content/drive/MyDrive/data/DataFormat/people.json")

# import pyarrow.csv as pv
# import pyarrow.parquet as pq

# # read hdb resale price
# hdb_table = pv.read_csv("/content/drive/MyDrive/data/DataFormat/resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.csv")
# # convert the CSV file to a Parquet file
# pq.write_table(hdb_table,'resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.parquet')
# hdb_parquet = pq.ParquetFile('resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.parquet')


In [148]:
# from google.colab import drive
# drive.mount('/content/drive')

# Read CSV file
PySpark provides DataFrameReader to load a DataFrame from external storage systems (e.g. file systems, key-value stores, etc). Use SparkSession.read to access this. You can use format(source) to specify the input data source format.  
Using csv("path") or format("csv").load("path") of DataFrameReader, you can read a CSV file into a PySpark DataFrame, These methods take a file path to read from as an argument. When you use format("csv") method, you can also specify the data sources by their fully qualified name, but for built-in sources, you can simply use their short names (csv,json, parquet, jdbc, text e.t.c).
In this example, it shows how to read a single CSV file “people.csv” into DataFrame as well as how to use your own defined schema when read file into DataFrame.


In [149]:
# Read CSV file people.csv
# Show result
df_csv.show()
# Print schema
df_csv.printSchema()


+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)



In [150]:
# # Write csv file
# df = spark.read.format('csv') \
#                 .option("inferSchema","true") \
#                 .option("header","true") \
#                 .option("sep",";") \
#                 .load("people.csv")


In [151]:
# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.format("parquet").mode("overwrite").save("people.parquet")

In [152]:
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()


+------+
|  name|
+------+
|Justin|
+------+



In [153]:
# inspect the parquet metadata
print(hdb_parquet.metadata)
# inspect the parquet row group metadata
print(hdb_parquet.metadata.row_group(0))
# inspect the column chunk metadata
print(hdb_parquet.metadata.row_group(0).column(9).statistics)

<pyarrow._parquet.FileMetaData object at 0x7d4befa652b0>
  created_by: parquet-cpp-arrow version 10.0.1
  num_columns: 10
  num_rows: 52203
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2079
<pyarrow._parquet.RowGroupMetaData object at 0x7d4be41243b0>
  num_columns: 10
  num_rows: 52203
  total_byte_size: 431095
<pyarrow._parquet.Statistics object at 0x7d4bdebd4c70>
  has_min_max: True
  min: 195000.0
  max: 1088888.0
  null_count: 0
  distinct_count: 0
  num_values: 52203
  physical_type: DOUBLE
  logical_type: None
  converted_type (legacy): NONE
