<a href="https://colab.research.google.com/github/MapathaMatiba/processing-big-data-predict/blob/main/Data_ingestion_student.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Processing Big Data - Data Ingestion
© Explore Data Science Academy

## Honour Code
I {**YOUR NAME**, **YOUR SURNAME**}, confirm - by submitting this document - that the solutions in this notebook are a result of my own work and that I abide by the [EDSA honour code](https://drive.google.com/file/d/1QDCjGZJ8-FmJE3bZdIQNwnJyQKPhHZBn/view?usp=sharing).
    Non-compliance with the honour code constitutes a material breach of contract.



## Context

To work constructively with any dataset, one needs to create an ingestion profile to make sure that the data at the source can be readily consumed. For this section of the predict, as the Data Engineer in the team, you will be required to design and implement the ingestion process. For the purposes of the project the AWS cloud storage service, namely, the S3 bucket service will act as your data source. All the data required can be found [here](https://processing-big-data-predict-stocks-data.s3.eu-west-1.amazonaws.com/stocks.zip).

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/data_engineering/transform/predict/DataIngestion.jpg"
     alt="Data Ingestion"
     style="float: center; padding-bottom=0.5em"
     width=40%/>
     <p><em>Figure 1. Data Ingestion</em></p>
</div>

Your manager, Gnissecorp Atadgib, knowing very well that you've recently completed your Data Engineering qualification, asks you to make use of Apache Spark for the ingestion as well as the rest of the project. His rationale being, that stock market data is generated every day and is quite time-sensitive and would require scalability when deploying to a production environment.

## Dataset - US Nasdaq




<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/data_engineering/transform/predict/Nasdaq.png"
     alt="Nasdaq"
     style="float: center; padding-bottom=0.5em"
     width=50%/>
     <p><em>Figure 2. Nasdaq</em></p>
</div>

The data that you will be working with is a historical snapshot of market data taken from the Nasdaq electronic market. This dataset contains historical daily prices for all tickers currently trading on Nasdaq. The up-to-date list can be found on their [website](https://www.nasdaq.com/)


The provided data contains price data dating back from 02 January 1962 up until 01 April 2020. The data found in the S3 bucket has been stored in the following structure:

```
     stocks/<Year>/<Month>/<Day>/stocks.csv
```
Each CSV file for every trading day contains the following details:
- **Date** - specifies trading date
- **Open** - opening price
- **High** - maximum price during the day
- **Low** - minimum price during the day
- **Close** - close price adjusted for splits
- **Adj Close** - close price adjusted for both dividends and splits
- **Volume** - the number of shares that changed hands during a given day

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Basic initialisation
To get you started, let's import some basic Python libraries as well as Spark modules and functions.

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=572b3df57c6bf202287be687b34d0a8b8eb6052c7ea779c726f4a557cae29b20
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

Remember that we need a `SparkContext` and `SparkSession` to interface with Spark.
We will mostly be using the `SparkContext` to interact with RDDs and the `SparkSession` to interface with Python objects.

> ℹ️ **Instructions** ℹ️
>
>Initialise a new **Spark Context** and **Session** that you will use to interface with Spark.

In [3]:
#TODO: Write your code here

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Predict") \
    .getOrCreate()

# Verify the Spark Context and Session are initialized
print("Spark Session initialized:", spark)

Spark Session initialized: <pyspark.sql.session.SparkSession object at 0x7cdb4476f850>


## Investigate dataset schema
At this point, it is enough to read in a single file to ascertain the data structure. You will be required to use the information obtained from the small subset to create a data schema. This data schema will be used when reading the entire dataset using Spark.

> ℹ️ **Instructions** ℹ️
>
>Make use of Pandas to read in a single file and investigate the plausible data types to be used when creating a Spark data schema.
>
>*You may use as many coding cells as necessary.*

In [7]:
import pandas as pd

# Correct file path for the CSV file
file_path = '/content/drive/My Drive/1962/01/02/stocks.csv'

# Read the CSV file using Pandas
df = pd.read_csv(file_path)

# Display the first few rows of the dataframe
print(df.head())

# Display the data types of each column
print(df.dtypes)


         Date      Open      High       Low     Close  Adj Close    Volume  \
0  1962-01-02  6.532155  6.556185  6.532155  6.532155   1.536658   55900.0   
1  1962-01-02  6.125844  6.160982  6.125844  6.125844   1.414651   59700.0   
2  1962-01-02  0.837449  0.837449  0.823045  0.823045   0.145748  352200.0   
3  1962-01-02  1.604167  1.619792  1.588542  1.604167   0.136957  163200.0   
4  1962-01-02  0.000000  3.296131  3.244048  3.296131   0.051993  105600.0   

  stock  
0    AA  
1  ARNC  
2    BA  
3   CAT  
4   CVX  
Date          object
Open         float64
High         float64
Low          float64
Close        float64
Adj Close    float64
Volume       float64
stock         object
dtype: object


In [8]:
# Display summary statistics to understand the data better
print(df.describe())

# Display the data types of each column
print(df.dtypes)

            Open        High         Low       Close     Adj Close  \
count  20.000000   20.000000   20.000000   20.000000  2.000000e+01   
mean    1.202568   18.146229   17.829750   17.967168  6.248174e+00   
std     2.457011   58.093799   56.989958   57.542460  2.597106e+01   
min     0.000000    0.096026    0.092908    0.092908  6.281419e-07   
25%     0.000000    0.640337    0.623523    0.627279  1.185386e-02   
50%     0.000000    2.457961    2.416295    2.450149  1.413525e-01   
75%     0.772764    8.051069    7.933194    7.933194  8.265629e-01   
max     7.713333  263.125000  258.125000  260.625000  1.165582e+02   

             Volume  
count  2.000000e+01  
mean   4.490900e+05  
std    7.027279e+05  
min    0.000000e+00  
25%    4.445000e+04  
50%    1.344000e+05  
75%    4.920000e+05  
max    2.480300e+06  
Date          object
Open         float64
High         float64
Low          float64
Close        float64
Adj Close    float64
Volume       float64
stock         object
dty

In [9]:
from pyspark.sql.types import StructType, StructField, DateType, StringType, FloatType, LongType

# Define the schema for the stock data
schema = StructType([
    StructField("Date", DateType(), True),
    StructField("Open", FloatType(), True),
    StructField("High", FloatType(), True),
    StructField("Low", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("Adj Close", FloatType(), True),
    StructField("Volume", LongType(), True)
])

# Display the schema
print(schema)


StructType([StructField('Date', DateType(), True), StructField('Open', FloatType(), True), StructField('High', FloatType(), True), StructField('Low', FloatType(), True), StructField('Close', FloatType(), True), StructField('Adj Close', FloatType(), True), StructField('Volume', LongType(), True)])


## Read CSV files

When working with big data, it is often not tenable to keep processing an entire data batch when you are in the process of development - this can be quite time-consuming. If the data is uniform, it is sufficient to work with a smaller subset to create basic functionality. Your manager has identified the year **1962** to perform the initial testing for data ingestion.

> ℹ️ **Instructions** ℹ️
>
>Read in the data for **1962** using a data schema that purely uses string data types. You will be required to convert to the appropriate data types at a later stage.
>
>*You may use as many coding cells as necessary.*

In [11]:
#TODO: Write your code here

from pyspark.sql.types import StructType, StructField, StringType

# Define the schema with all fields as StringType
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Open", StringType(), True),
    StructField("High", StringType(), True),
    StructField("Low", StringType(), True),
    StructField("Close", StringType(), True),
    StructField("Adj Close", StringType(), True),
    StructField("Volume", StringType(), True)
])

# Step 4: Reading the CSV File
# Define the file path
file_path = "/content/drive/My Drive/1962"  # Update this with your actual file path

# Read the CSV file using the defined schema
df_1962 = spark.read.csv(file_path, header=True, schema=schema)

# Step 5: Verifying the Data
# Print the schema to verify data types
df_1962.printSchema()

# Show a sample of the data
df_1962.show(10)

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)

+----+----+----+---+-----+---------+------+
|Date|Open|High|Low|Close|Adj Close|Volume|
+----+----+----+---+-----+---------+------+
+----+----+----+---+-----+---------+------+



## Update column names
To make the data easier to work with, you will need to make a few changes:
1. Column headers should all be in lowercase; and
2. Whitespaces should be replaced with underscores.


> ℹ️ **Instructions** ℹ️
>
>Make sure that the column headers are all in lowercase and that any whitespaces are replaced with underscores.
>
>*You may use as many coding cells as necessary.*

In [12]:
#TODO: Write your code here
import pandas as pd
import os

# Set the root directory for 1962 data
root_dir = "/content/drive/My Drive/1962"

# Create an empty list to store all DataFrames
all_dfs = []

# Function to clean column names
def clean_column_names(df):
    df.columns = df.columns.str.lower().str.replace(' ', '_')
    return df

# Walk through the directory structure
for dirpath, dirnames, filenames in os.walk(root_dir):
    for filename in filenames:
        if filename.endswith('.csv'):
            file_path = os.path.join(dirpath, filename)

            # Read the CSV file as a whole
            df = pd.read_csv(file_path, dtype=str)

            # Clean column names for the DataFrame
            df = clean_column_names(df)

            # Add a column to identify the source file
            df['source_file'] = filename

            # Append to the list of all DataFrames
            all_dfs.append(df)

# Concatenate all DataFrames into a single DataFrame
df_1962 = pd.concat(all_dfs, ignore_index=True)

# Print the first few rows and info about the DataFrame
print(df_1962.head())
print(df_1962.info())

# Print column names to verify the changes
print("\nUpdated column names:")
print(df_1962.columns.tolist())

         date                open                high                 low  \
0  1962-09-21    5.60699987411499    5.60699987411499   5.446800231933594   
1  1962-09-21   5.247376441955566   5.247376441955566   5.118534564971924   
2  1962-09-21  0.6502057909965515  0.6563786268234253  0.6378600597381592   
3  1962-09-21             1.40625             1.40625            1.390625   
4  1962-09-21                 0.0  3.5714285373687744   3.497023820877075   

                close            adj_close    volume stock source_file  
0    5.45481014251709   1.3031203746795654   25900.0    AA  stocks.csv  
1   5.130247592926025   1.2030487060546875   27700.0  ARNC  stocks.csv  
2  0.6419752836227417  0.11746639013290405  959600.0    BA  stocks.csv  
3             1.40625   0.1225220113992691  230400.0   CAT  stocks.csv  
4   3.497023820877075  0.05664148554205895  192000.0   CVX  stocks.csv  
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5106 entries, 0 to 5105
Data columns (total 9 col

## Null Values
Null values often represent missing pieces of data. It is always good to know where your null values lie - so you can quickly identify and remedy any issues stemming from these.

> ℹ️ **Instructions** ℹ️
>
>Write code to count the number of null values found in each column.
>
>*You may use as many coding cells as necessary.*

In [13]:
#TODO: Write your code here

import pandas as pd
import os

# Set the root directory for 1962 data
root_dir = "/content/drive/My Drive/1962"

# Create an empty list to store all DataFrames
all_dfs = []

# Function to clean column names
def clean_column_names(df):
    df.columns = df.columns.str.lower().str.replace(' ', '_')
    return df

# Walk through the directory structure
for dirpath, dirnames, filenames in os.walk(root_dir):
    for filename in filenames:
        if filename.endswith('.csv'):
            file_path = os.path.join(dirpath, filename)

            # Read the CSV file as a whole
            df = pd.read_csv(file_path, dtype=str)

            # Clean column names
            df = clean_column_names(df)

            # Add a column to identify the source file
            df['source_file'] = filename

            # Append to the list of all DataFrames
            all_dfs.append(df)

# Concatenate all DataFrames into a single DataFrame
df_1962 = pd.concat(all_dfs, ignore_index=True)

# Print the first few rows and info about the DataFrame
print(df_1962.head())
print(df_1962.info())

# Print column names to verify the changes
print("\nUpdated column names:")
print(df_1962.columns.tolist())

# Count null values in each column
null_counts = df_1962.isnull().sum()

# Print the count of null values for each column
print("\nNull value counts for each column:")
print(null_counts)

# Calculate and print the percentage of null values in each column
null_percentages = (null_counts / len(df_1962)) * 100
print("\nPercentage of null values in each column:")
print(null_percentages)

# Identify columns with null values
columns_with_nulls = null_counts[null_counts > 0].index.tolist()
print("\nColumns containing null values:")
print(columns_with_nulls)

         date                open                high                 low  \
0  1962-09-21    5.60699987411499    5.60699987411499   5.446800231933594   
1  1962-09-21   5.247376441955566   5.247376441955566   5.118534564971924   
2  1962-09-21  0.6502057909965515  0.6563786268234253  0.6378600597381592   
3  1962-09-21             1.40625             1.40625            1.390625   
4  1962-09-21                 0.0  3.5714285373687744   3.497023820877075   

                close            adj_close    volume stock source_file  
0    5.45481014251709   1.3031203746795654   25900.0    AA  stocks.csv  
1   5.130247592926025   1.2030487060546875   27700.0  ARNC  stocks.csv  
2  0.6419752836227417  0.11746639013290405  959600.0    BA  stocks.csv  
3             1.40625   0.1225220113992691  230400.0   CAT  stocks.csv  
4   3.497023820877075  0.05664148554205895  192000.0   CVX  stocks.csv  
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5106 entries, 0 to 5105
Data columns (total 9 col

## Data type conversion - The final data schema

Now that we have identified the number of missing values in the data set, we'll move on to convert our data schema to the required data types.

> ℹ️ **Instructions** ℹ️
>
>Use typecasting to convert the string data types in your current data schema to more appropriate data types.
>
>*You may use as many coding cells as necessary.*

In [14]:
#TODO: Write your code here

import pandas as pd
import os

# Set the root directory for 1962 data
root_dir = "/content/drive/My Drive/1962"

# Create an empty list to store all DataFrames
all_dfs = []

# Function to clean column names
def clean_column_names(df):
    df.columns = df.columns.str.lower().str.replace(' ', '_')
    return df

# Function to convert data types
def convert_dtypes(df):
    # Define the data types for each column
    dtypes = {
        'date': 'datetime64[ns]',
        'symbol': 'category',
        'open': 'float64',
        'high': 'float64',
        'low': 'float64',
        'close': 'float64',
        'volume': 'int64',
        'source_file': 'category'
    }

    # Convert each column to its appropriate data type
    for col, dtype in dtypes.items():
        if col in df.columns:
            if dtype == 'datetime64[ns]':
                df[col] = pd.to_datetime(df[col], errors='coerce')
            elif dtype in ['float64', 'int64']:
                df[col] = pd.to_numeric(df[col], errors='coerce')
            else:
                df[col] = df[col].astype(dtype)

    return df

# Walk through the directory structure
for dirpath, dirnames, filenames in os.walk(root_dir):
    for filename in filenames:
        if filename.endswith('.csv'):
            file_path = os.path.join(dirpath, filename)

            # Read the entire CSV file at once
            df = pd.read_csv(file_path, dtype=str)

            # Clean column names
            df = clean_column_names(df)

            # Add a column to identify the source file
            df['source_file'] = filename

            # Convert data types
            df = convert_dtypes(df)

            # Append to the list of all DataFrames
            all_dfs.append(df)

# Concatenate all DataFrames into a single DataFrame
df_1962 = pd.concat(all_dfs, ignore_index=True)

# Print the first few rows and info about the DataFrame
print(df_1962.head())
print(df_1962.info())

# Print column names and their data types
print("\nColumn names and data types:")
print(df_1962.dtypes)

# Count null values in each column
null_counts = df_1962.isnull().sum()

# Print the count of null values for each column
print("\nNull value counts for each column:")
print(null_counts)

# Calculate and print the percentage of null values in each column
null_percentages = (null_counts / len(df_1962)) * 100
print("\nPercentage of null values in each column:")
print(null_percentages)


        date      open      high       low     close            adj_close  \
0 1962-09-21  5.607000  5.607000  5.446800  5.454810   1.3031203746795654   
1 1962-09-21  5.247376  5.247376  5.118535  5.130248   1.2030487060546875   
2 1962-09-21  0.650206  0.656379  0.637860  0.641975  0.11746639013290405   
3 1962-09-21  1.406250  1.406250  1.390625  1.406250   0.1225220113992691   
4 1962-09-21  0.000000  3.571429  3.497024  3.497024  0.05664148554205895   

     volume stock source_file  
0   25900.0    AA  stocks.csv  
1   27700.0  ARNC  stocks.csv  
2  959600.0    BA  stocks.csv  
3  230400.0   CAT  stocks.csv  
4  192000.0   CVX  stocks.csv  
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5106 entries, 0 to 5105
Data columns (total 9 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   date         5106 non-null   datetime64[ns]
 1   open         5106 non-null   float64       
 2   high         5106 non-null   float64  

## Consolidate missing values
We have to check if the data type conversion above was done correctly.
If the casting was not successful, a null value gets inserted into the dataframe. You can thus check for successful conversion by determining if any null values are included in the resulting dataframe.


> ℹ️ **Instructions** ℹ️
>
>Write code to compare the number of invalid entries (nulls) pre-conversion and post-conversion.
>
>*You may use as many coding cells as necessary.*

In [15]:
#TODO: Write your code here

import pandas as pd
import os

# Set the root directory for 1962 data
root_dir = "/content/drive/My Drive/1962"

# Create an empty list to store all DataFrames
all_dfs = []

# Function to clean column names
def clean_column_names(df):
    df.columns = df.columns.str.lower().str.replace(' ', '_')
    return df

# Function to convert data types
def convert_dtypes(df):
    # Define the data types for each column
    dtypes = {
        'date': 'datetime64[ns]',
        'symbol': 'category',
        'open': 'float64',
        'high': 'float64',
        'low': 'float64',
        'close': 'float64',
        'volume': 'int64',
        'source_file': 'category'
    }

    # Convert each column to its appropriate data type
    for col, dtype in dtypes.items():
        if col in df.columns:
            if dtype == 'datetime64[ns]':
                df[col] = pd.to_datetime(df[col], errors='coerce')
            elif dtype in ['float64', 'int64']:
                df[col] = pd.to_numeric(df[col], errors='coerce')
            else:
                df[col] = df[col].astype(dtype)

    return df

# Walk through the directory structure
for dirpath, dirnames, filenames in os.walk(root_dir):
    for filename in filenames:
        if filename.endswith('.csv'):
            file_path = os.path.join(dirpath, filename)

            # Read the entire CSV file at once
            df = pd.read_csv(file_path, dtype=str)

            # Clean column names
            df = clean_column_names(df)

            # Add a column to identify the source file
            df['source_file'] = filename

            # Append to the list of all DataFrames
            all_dfs.append(df)

# Concatenate all DataFrames into a single DataFrame
df_1962 = pd.concat(all_dfs, ignore_index=True)

# Count null values before conversion
null_counts_before = df_1962.isnull().sum()

# Convert data types
df_1962_converted = convert_dtypes(df_1962.copy())

# Count null values after conversion
null_counts_after = df_1962_converted.isnull().sum()

# Compare null counts
print("Null value counts before conversion:")
print(null_counts_before)
print("\nNull value counts after conversion:")
print(null_counts_after)

# Calculate and print the difference in null counts
null_count_diff = null_counts_after - null_counts_before
print("\nDifference in null counts (after - before):")
print(null_count_diff)

# Identify columns where null counts increased
problematic_columns = null_count_diff[null_count_diff > 0].index.tolist()
print("\nColumns with increased null counts after conversion:")
print(problematic_columns)

# Print data types of converted DataFrame
print("\nData types after conversion:")
print(df_1962_converted.dtypes)

# Optional: Sample rows with new nulls in problematic columns
if problematic_columns:
    print("\nSample rows with new nulls in problematic columns:")
    for col in problematic_columns:
        mask = df_1962[col].notnull() & df_1962_converted[col].isnull()
        if mask.any():
            print(f"\nColumn: {col}")
            print(df_1962.loc[mask, col].head())
            print("Converted to:")
            print(df_1962_converted.loc[mask, col].head())


Null value counts before conversion:
date            0
open            0
high            0
low            22
close           0
adj_close       0
volume         21
stock           0
source_file     0
dtype: int64

Null value counts after conversion:
date            0
open            0
high            0
low            42
close           0
adj_close       0
volume         21
stock           0
source_file     0
dtype: int64

Difference in null counts (after - before):
date            0
open            0
high            0
low            20
close           0
adj_close       0
volume          0
stock           0
source_file     0
dtype: int64

Columns with increased null counts after conversion:
['low']

Data types after conversion:
date           datetime64[ns]
open                  float64
high                  float64
low                   float64
close                 float64
adj_close              object
volume                float64
stock                  object
source_file          cat

Here you should be able to see if any of your casts went wrong.
Do not attempt to correct any missing values at this point. This will be dealt with in later sections of the predict.

## Generate parquet files
When writing in Spark, we typically use parquet format. This format allows parallel writing using Spark's optimisation while maintaining other useful things like metadata.

When writing, it is good to make sure that the data is sufficiently partitioned.

Generally, data should be partitioned with one partition for every 200MB of data, but this also depends on the size of your cluster and executors.


### Check the size of the dataframe before partitioning

In [None]:
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer

In [None]:
rdd = df.rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
obj = rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)
size = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(obj)
size_MB = size/1000000
partitions = max(int(size_MB/200), 2)
print(f'The dataframe is {size_MB} MB')

### Write parquet files to the local directory
> ℹ️ **Instructions** ℹ️
>
> Use the **coalesce** function and the number of **partitions** derived above to write parquet files to your local directory
>
>*You may use as many coding cells as necessary.*

In [None]:
#TODO: Write your code here

!apt-get update -qq
!apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

# Set environment variables for Spark and Java
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

# Initialize Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# Create Spark session with additional configuration
spark = SparkSession.builder \
    .appName("Mapatha Matiba Shaun") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# Define root directory for 2007 data
root_dir = "/content/drive/My Drive/1962"

# Function to read CSV files and clean column names
def read_and_clean_csv(file_path):
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    for column in df.columns:
        df = df.withColumnRenamed(column, column.lower().replace(" ", "_"))
    return df

# Read all CSV files from the directory
df_1962 = None
first_file = True

for dirpath, dirnames, filenames in os.walk(root_dir):
    for filename in filenames:
        if filename.endswith('.csv'):
            file_path = os.path.join(dirpath, filename)
            df_temp = read_and_clean_csv(file_path)
            df_temp = df_temp.withColumn("source_file", lit(filename))

            # Union all DataFrames together
            if first_file:
                df_1962 = df_temp
                first_file = False
            else:
                df_1962 = df_1962.union(df_temp)

# Check if any data was loaded
if df_1962 is None:
    raise ValueError("No CSV files found in the specified directory")

# Convert data types
df_1962 = df_1962.withColumn("date", col("date").cast("date"))
df_1962 = df_1962.withColumn("open", col("open").cast("double"))
df_1962 = df_1962.withColumn("high", col("high").cast("double"))
df_1962 = df_1962.withColumn("low", col("low").cast("double"))
df_1962 = df_1962.withColumn("close", col("close").cast("double"))
df_1962 = df_1962.withColumn("volume", col("volume").cast("long"))

# Calculate the size and number of partitions
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer

rdd = df_1962.rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
obj = rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)
size = spark.sparkContext._jvm.org.apache.spark.util.SizeEstimator.estimate(obj)
size_MB = size / 1000000
partitions = max(int(size_MB / 200), 2)
print(f'The dataframe is {size_MB:.2f} MB')
print(f'Number of partitions: {partitions}')

# Define the output directory for parquet files
output_dir = "/content/drive/My Drive/1962/1962_parquet"

# Write parquet files
df_1962.coalesce(partitions).write.mode("overwrite").partitionBy("source_file").parquet(output_dir)
print(f"Parquet files have been written to {output_dir}")

# Stop the Spark session
spark.stop()
