# PySpark Data Engineering Project

### Using PySpark

## Importing the Data

### Creating the Pyspark SQL Dataframe

In [1]:
# !pip install pyspark
# !pip install pandas

In [2]:
from pyspark.sql import SparkSession

In [3]:
# pd.read_csv('eurusd_hour.csv').head()
# type(pd.read_csv('DataSet/eurusd_hour.csv'))

# Sample DataFrame
# data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
# columns = ["ID", "Name", "Age"]

# # Create a Spark session
# spark = SparkSession.builder.appName("example").getOrCreate()

# # Create a DataFrame
# df = spark.createDataFrame(data, columns)

In [4]:
spark = SparkSession.builder.appName("Project").getOrCreate()
# spark = SparkSession.builder.appName("Project").config("spark.rpc.message.maxSize", 2147483647).getOrCreate()

In [5]:
spark

In [6]:
df_raw = spark.read.option('header','true').csv('DataSet/eurusd_hour.csv')
df_raw.show(3)

+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|      Date| Time|    BO|    BH|    BL|    BC|                 BCh|    AO|    AH|    AL|    AC|                 ACh|
+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|2005-05-02|00:00|1.2852|1.2852| 1.284|1.2844|-0.00079999999999...|1.2854|1.2854|1.2842|1.2846|-0.00080000000000...|
|2005-05-02|01:00|1.2844|1.2848|1.2839|1.2842|-0.00019999999999...|1.2846| 1.285|1.2841|1.2844|-0.00019999999999...|
|2005-05-02|02:00|1.2843|1.2854|1.2841|1.2851|0.000799999999999...|1.2845|1.2856|1.2843|1.2853|0.000800000000000...|
+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
only showing top 3 rows



In [7]:
df_pyspark = spark.read.option('header','true').csv('DataSet/eurusd_hour.csv', inferSchema = True)
df_pyspark

DataFrame[Date: date, Time: timestamp, BO: double, BH: double, BL: double, BC: double, BCh: double, AO: double, AH: double, AL: double, AC: double, ACh: double]

In [8]:
df_pyspark.show(5)

+----------+-------------------+------+------+------+------+--------------------+------+------+------+------+--------------------+
|      Date|               Time|    BO|    BH|    BL|    BC|                 BCh|    AO|    AH|    AL|    AC|                 ACh|
+----------+-------------------+------+------+------+------+--------------------+------+------+------+------+--------------------+
|2005-05-02|2023-10-03 00:00:00|1.2852|1.2852| 1.284|1.2844|-7.99999999999911...|1.2854|1.2854|1.2842|1.2846|-8.00000000000133...|
|2005-05-02|2023-10-03 01:00:00|1.2844|1.2848|1.2839|1.2842|-1.99999999999978E-4|1.2846| 1.285|1.2841|1.2844|-1.99999999999978E-4|
|2005-05-02|2023-10-03 02:00:00|1.2843|1.2854|1.2841|1.2851|7.999999999999119E-4|1.2845|1.2856|1.2843|1.2853|8.000000000001339E-4|
|2005-05-02|2023-10-03 03:00:00|1.2851|1.2859| 1.285|1.2851|                 0.0|1.2853|1.2861|1.2852|1.2853|                 0.0|
|2005-05-02|2023-10-03 04:00:00|1.2852|1.2859|1.2849|1.2855| 3.00000000000189E-4|1.

In [9]:
type(df_pyspark)
# this is a pyspark sql dataframe not a pandas dataframe

pyspark.sql.dataframe.DataFrame

In [10]:
# Checking the Schema
df_pyspark.printSchema()
# same as df.info() - gives information about our columns

root
 |-- Date: date (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- BO: double (nullable = true)
 |-- BH: double (nullable = true)
 |-- BL: double (nullable = true)
 |-- BC: double (nullable = true)
 |-- BCh: double (nullable = true)
 |-- AO: double (nullable = true)
 |-- AH: double (nullable = true)
 |-- AL: double (nullable = true)
 |-- AC: double (nullable = true)
 |-- ACh: double (nullable = true)



In [11]:
# double checking the column types / schema of df
df_pyspark.dtypes

[('Date', 'date'),
 ('Time', 'timestamp'),
 ('BO', 'double'),
 ('BH', 'double'),
 ('BL', 'double'),
 ('BC', 'double'),
 ('BCh', 'double'),
 ('AO', 'double'),
 ('AH', 'double'),
 ('AL', 'double'),
 ('AC', 'double'),
 ('ACh', 'double')]

In [12]:
df_pyspark.show(3)

+----------+-------------------+------+------+------+------+--------------------+------+------+------+------+--------------------+
|      Date|               Time|    BO|    BH|    BL|    BC|                 BCh|    AO|    AH|    AL|    AC|                 ACh|
+----------+-------------------+------+------+------+------+--------------------+------+------+------+------+--------------------+
|2005-05-02|2023-10-03 00:00:00|1.2852|1.2852| 1.284|1.2844|-7.99999999999911...|1.2854|1.2854|1.2842|1.2846|-8.00000000000133...|
|2005-05-02|2023-10-03 01:00:00|1.2844|1.2848|1.2839|1.2842|-1.99999999999978E-4|1.2846| 1.285|1.2841|1.2844|-1.99999999999978E-4|
|2005-05-02|2023-10-03 02:00:00|1.2843|1.2854|1.2841|1.2851|7.999999999999119E-4|1.2845|1.2856|1.2843|1.2853|8.000000000001339E-4|
+----------+-------------------+------+------+------+------+--------------------+------+------+------+------+--------------------+
only showing top 3 rows



As we can see here an error was introduced in the Time COLUMN. 

A timestamp with the current date was introduced additionally to the time in the Time column 

Let's fix this:

In [13]:
from pyspark.sql.functions import date_format

# keeping only the hours and minutes of the time column
df = df_pyspark.withColumn("Time", date_format("Time", "HH:mm"))
df.show(5)

+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|      Date| Time|    BO|    BH|    BL|    BC|                 BCh|    AO|    AH|    AL|    AC|                 ACh|
+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|2005-05-02|00:00|1.2852|1.2852| 1.284|1.2844|-7.99999999999911...|1.2854|1.2854|1.2842|1.2846|-8.00000000000133...|
|2005-05-02|01:00|1.2844|1.2848|1.2839|1.2842|-1.99999999999978E-4|1.2846| 1.285|1.2841|1.2844|-1.99999999999978E-4|
|2005-05-02|02:00|1.2843|1.2854|1.2841|1.2851|7.999999999999119E-4|1.2845|1.2856|1.2843|1.2853|8.000000000001339E-4|
|2005-05-02|03:00|1.2851|1.2859| 1.285|1.2851|                 0.0|1.2853|1.2861|1.2852|1.2853|                 0.0|
|2005-05-02|04:00|1.2852|1.2859|1.2849|1.2855| 3.00000000000189E-4|1.2854|1.2861|1.2851|1.2857|2.999999999999669...|
+----------+-----+------+------+------+------+------------------

In [14]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- BO: double (nullable = true)
 |-- BH: double (nullable = true)
 |-- BL: double (nullable = true)
 |-- BC: double (nullable = true)
 |-- BCh: double (nullable = true)
 |-- AO: double (nullable = true)
 |-- AH: double (nullable = true)
 |-- AL: double (nullable = true)
 |-- AC: double (nullable = true)
 |-- ACh: double (nullable = true)



The type of the time column has become string now

In [15]:
from pyspark.sql.functions import concat, col

# Creating a new column called Datetime which will be a concatination of the data and time columns
df = df.withColumn("DateTime", concat(col("Date"), col("Time")))
df.show(3)

+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+---------------+
|      Date| Time|    BO|    BH|    BL|    BC|                 BCh|    AO|    AH|    AL|    AC|                 ACh|       DateTime|
+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+---------------+
|2005-05-02|00:00|1.2852|1.2852| 1.284|1.2844|-7.99999999999911...|1.2854|1.2854|1.2842|1.2846|-8.00000000000133...|2005-05-0200:00|
|2005-05-02|01:00|1.2844|1.2848|1.2839|1.2842|-1.99999999999978E-4|1.2846| 1.285|1.2841|1.2844|-1.99999999999978E-4|2005-05-0201:00|
|2005-05-02|02:00|1.2843|1.2854|1.2841|1.2851|7.999999999999119E-4|1.2845|1.2856|1.2843|1.2853|8.000000000001339E-4|2005-05-0202:00|
+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+---------------+
only showing top 3 rows



lets fix the format and also the type of the datetime column

By fixing the format I mean: add a space between the date and the time

In [16]:
from pyspark.sql.functions import to_timestamp

df = df.withColumn("DateTime", to_timestamp(col("DateTime"), "yyyy-MM-ddHH:mm"))
df.show(3)

+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+-------------------+
|      Date| Time|    BO|    BH|    BL|    BC|                 BCh|    AO|    AH|    AL|    AC|                 ACh|           DateTime|
+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+-------------------+
|2005-05-02|00:00|1.2852|1.2852| 1.284|1.2844|-7.99999999999911...|1.2854|1.2854|1.2842|1.2846|-8.00000000000133...|2005-05-02 00:00:00|
|2005-05-02|01:00|1.2844|1.2848|1.2839|1.2842|-1.99999999999978E-4|1.2846| 1.285|1.2841|1.2844|-1.99999999999978E-4|2005-05-02 01:00:00|
|2005-05-02|02:00|1.2843|1.2854|1.2841|1.2851|7.999999999999119E-4|1.2845|1.2856|1.2843|1.2853|8.000000000001339E-4|2005-05-02 02:00:00|
+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+-------------------+
only showing top 3 rows



In [17]:
# changing the order of the columns in the PySpark DataFrame
df = df.select("DateTime", "Date", "Time", "BO", "BH", "BL", "BC", "BCh", "AO", "AH", "AL", "AC", "ACh")
df.show(3)

+-------------------+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|           DateTime|      Date| Time|    BO|    BH|    BL|    BC|                 BCh|    AO|    AH|    AL|    AC|                 ACh|
+-------------------+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|2005-05-02 00:00:00|2005-05-02|00:00|1.2852|1.2852| 1.284|1.2844|-7.99999999999911...|1.2854|1.2854|1.2842|1.2846|-8.00000000000133...|
|2005-05-02 01:00:00|2005-05-02|01:00|1.2844|1.2848|1.2839|1.2842|-1.99999999999978E-4|1.2846| 1.285|1.2841|1.2844|-1.99999999999978E-4|
|2005-05-02 02:00:00|2005-05-02|02:00|1.2843|1.2854|1.2841|1.2851|7.999999999999119E-4|1.2845|1.2856|1.2843|1.2853|8.000000000001339E-4|
+-------------------+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
only showing top 3 rows



In [18]:
# checking the column types / schema of df
df.printSchema()

root
 |-- DateTime: timestamp (nullable = true)
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- BO: double (nullable = true)
 |-- BH: double (nullable = true)
 |-- BL: double (nullable = true)
 |-- BC: double (nullable = true)
 |-- BCh: double (nullable = true)
 |-- AO: double (nullable = true)
 |-- AH: double (nullable = true)
 |-- AL: double (nullable = true)
 |-- AC: double (nullable = true)
 |-- ACh: double (nullable = true)



In [19]:
type(df)

pyspark.sql.dataframe.DataFrame

## Column and Data Analysis 

### Data Description

- **Date**: This column contains the date on which the financial data was recorded. Each row in the DataFrame corresponds to a specific date.

- **Time**: This column provides the timestamp for each data point, indicating the exact time when the data was captured. This level of granularity allows for tracking changes throughout the trading day.

- **Bid Open (BO)**: The "Bid Open" column represents the initial bid price at the start of a specific time interval. It's essentially the first price at which traders are willing to buy the EUR/USD currency pair during that interval.

- **Bid High (BH)**: This column shows the highest bid price reached during the same time interval. It tells us the peak price that buyers were willing to pay within that period.

- **Bid Low (BL)**: Here, we have the lowest bid price observed during the time interval, indicating the lowest price at which traders were willing to buy the currency pair.

- **Bid Close (BC)**: The closing bid price signifies the last recorded bid price at the end of the time interval, which can provide insights into how trading sentiment may have evolved during that period.

- **Bid Change (BCh)**: This column seems to represent the change in bid price during the interval, possibly calculated as the difference between the opening and closing bid prices. It helps track price movements.

- **Ask Open (AO)**: Similar to the bid open, this is the initial asking price for the EUR/USD currency pair at the beginning of the time interval. It represents the first price at which sellers are willing to sell.

- **Ask High (AH)**: The highest asking price during the time interval is found here. It indicates the peak price sellers were requesting for the currency pair.

- **Ask Low (AL)**: The lowest asking price observed during the interval is listed in this column, representing the lowest price sellers were willing to accept.

- **Ask Close (AC)**: The closing asking price signifies the last recorded asking price at the end of the time interval. Like the closing bid price, it offers insights into potential shifts in market sentiment.

- **Ask Change (ACh)**: Similar to the bid change, this column appears to represent the change in asking price during the interval, possibly calculated as the difference between the opening and closing asking prices. It helps monitor price fluctuations from the perspective of sellers.

This dataset provides a comprehensive view of trading activity for the EUR/USD currency pair, including open, high, low, and close prices for both bid and ask prices. It's invaluable for financial analysis and can be used to analyze market trends, make trading decisions, and gain insights into the behavior of currency pairs in the foreign exchange market.


### Let's check the dimensions of our Pyspark SQL Dataframe

In [20]:
df.show(3)

+-------------------+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|           DateTime|      Date| Time|    BO|    BH|    BL|    BC|                 BCh|    AO|    AH|    AL|    AC|                 ACh|
+-------------------+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|2005-05-02 00:00:00|2005-05-02|00:00|1.2852|1.2852| 1.284|1.2844|-7.99999999999911...|1.2854|1.2854|1.2842|1.2846|-8.00000000000133...|
|2005-05-02 01:00:00|2005-05-02|01:00|1.2844|1.2848|1.2839|1.2842|-1.99999999999978E-4|1.2846| 1.285|1.2841|1.2844|-1.99999999999978E-4|
|2005-05-02 02:00:00|2005-05-02|02:00|1.2843|1.2854|1.2841|1.2851|7.999999999999119E-4|1.2845|1.2856|1.2843|1.2853|8.000000000001339E-4|
+-------------------+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
only showing top 3 rows



In [21]:
# and 13 columns ( 1 that I introduced - DateTime )
# we have 93084 rows in our dataframe
print(f"Number of Rows: {df.count()}")
print(f"Number of Columns: {len(df.columns)}")

Number of Rows: 93084
Number of Columns: 13


In [22]:
df_pyspark.count()
# we have the same number of rows

93084

### Data Exploration

Apparently we have 6 days of week worth of data in our dataset

In [23]:
from pyspark.sql.functions import col, sum

sorted_dates = df_pyspark.select('Date').distinct().orderBy(col('Date'))

sorted_dates.show()

+----------+
|      Date|
+----------+
|2005-05-02|
|2005-05-03|
|2005-05-04|
|2005-05-05|
|2005-05-06|
|2005-05-08|
|2005-05-09|
|2005-05-10|
|2005-05-11|
|2005-05-12|
|2005-05-13|
|2005-05-15|
|2005-05-16|
|2005-05-17|
|2005-05-18|
|2005-05-19|
|2005-05-20|
|2005-05-22|
|2005-05-23|
|2005-05-24|
+----------+
only showing top 20 rows



Let's find out why we have data for 6 days a week instead of 5

Forex (foreign exchange) markets are generally open and available for trading 24 hours a day, five days a week. The forex market operates globally and is divided into different trading sessions, which include:

Sydney Session: This session starts at 10:00 PM GMT and ends at 7:00 AM GMT. It is the first major session to open.

Tokyo Session: Following the Sydney session, the Tokyo session begins at 11:00 PM GMT and ends at 8:00 AM GMT.

London Session: The London session is one of the most significant and active trading sessions. It starts at 8:00 AM GMT and ends at 4:00 PM GMT.

New York Session: The New York session overlaps with the London session and is open from 1:00 PM GMT to 10:00 PM GMT. This overlap period is typically the busiest time for forex trading.

During these trading sessions, currency pairs like EUR/USD (Euro/US Dollar) are actively traded. However, it's important to note that the forex market is closed on weekends (from Friday evening until Sunday evening GMT) and during certain holidays when major financial centers are closed.

So apparently forex trading is generally available every day from Sunday evening GMT to Friday evening GMT,

In [24]:
df.show(3)

+-------------------+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|           DateTime|      Date| Time|    BO|    BH|    BL|    BC|                 BCh|    AO|    AH|    AL|    AC|                 ACh|
+-------------------+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
|2005-05-02 00:00:00|2005-05-02|00:00|1.2852|1.2852| 1.284|1.2844|-7.99999999999911...|1.2854|1.2854|1.2842|1.2846|-8.00000000000133...|
|2005-05-02 01:00:00|2005-05-02|01:00|1.2844|1.2848|1.2839|1.2842|-1.99999999999978E-4|1.2846| 1.285|1.2841|1.2844|-1.99999999999978E-4|
|2005-05-02 02:00:00|2005-05-02|02:00|1.2843|1.2854|1.2841|1.2851|7.999999999999119E-4|1.2845|1.2856|1.2843|1.2853|8.000000000001339E-4|
+-------------------+----------+-----+------+------+------+------+--------------------+------+------+------+------+--------------------+
only showing top 3 rows



In [25]:
from pyspark.sql.functions import min, max
from pyspark.sql.functions import date_format

selected_date = "2019-09-15"

# Filter the DataFrame to select data for the specified date (we picked a random Sunday)
filtered_df = df.filter(df['Date'] == selected_date)

# Show the filtered data
filtered_df.show()

hour_range = filtered_df.select(min('Time'), max('Time')).first()

# Extract the minimum and maximum values
min_hour = hour_range[0]
max_hour = hour_range[1]

day_of_week = filtered_df.select(date_format('Date', 'EEEE').alias('DayOfWeek')).first()

# Extract the day of the week
day_of_week = day_of_week['DayOfWeek']

# Display the day of the week
print(f"The selected date ({selected_date}) was a {day_of_week}.")

# Display the hour range
print(f"Hour range for {selected_date}:         {min_hour}      to      {max_hour}")

+-------------------+----------+-----+-------+-------+-------+-------+--------------------+------------------+-------+-------+-------+--------------------+
|           DateTime|      Date| Time|     BO|     BH|     BL|     BC|                 BCh|                AO|     AH|     AL|     AC|                 ACh|
+-------------------+----------+-----+-------+-------+-------+-------+--------------------+------------------+-------+-------+-------+--------------------+
|2019-09-15 21:00:00|2019-09-15|21:00|1.10727|1.10743|1.10678|1.10712|-1.49999999999872...|           1.10768| 1.1079| 1.1072|1.10749|-1.90000000000134...|
|2019-09-15 22:00:00|2019-09-15|22:00|1.10712|1.10794|1.10665|1.10785|7.299999999998974E-4|           1.10755|1.10811|1.10734|1.10801|4.599999999999049E-4|
|2019-09-15 23:00:00|2019-09-15|23:00|1.10786|1.10858|1.10783|1.10825|3.899999999998904E-4|1.1080299999999998|1.10871|1.10798|1.10839| 3.60000000000138E-4|
+-------------------+----------+-----+-------+-------+-------+--

In [26]:
selected_date = "2019-09-13"


# Filter the DataFrame to select data for the specified date (we picked a random Sunday)
filtered_df = df.filter(df['Date'] == selected_date)

# Show the filtered data
filtered_df.show()

hour_range = filtered_df.select(min('Time'), max('Time')).first()

# Extract the minimum and maximum values
min_hour = hour_range[0]
max_hour = hour_range[1]

day_of_week = filtered_df.select(date_format('Date', 'EEEE').alias('DayOfWeek')).first()

# Extract the day of the week
day_of_week = day_of_week['DayOfWeek']

# Display the day of the week
print(f"The selected date ({selected_date}) was a {day_of_week}.")

# Display the hour range
print(f"Hour range for {selected_date}:         {min_hour}      to      {max_hour}")

+-------------------+----------+-----+------------------+------------------+------------------+------------------+--------------------+------------------+-------+------------------+------------------+--------------------+
|           DateTime|      Date| Time|                BO|                BH|                BL|                BC|                 BCh|                AO|     AH|                AL|                AC|                 ACh|
+-------------------+----------+-----+------------------+------------------+------------------+------------------+--------------------+------------------+-------+------------------+------------------+--------------------+
|2019-09-13 00:00:00|2019-09-13|00:00|1.1055700000000002|           1.10643|1.1055700000000002|           1.10636|7.899999999998464E-4|           1.10573|1.10656|           1.10572|           1.10649|7.600000000000938E-4|
|2019-09-13 01:00:00|2019-09-13|01:00|           1.10635|           1.10658|           1.10622|           1.1064

In [27]:
selected_date = "2019-09-12"

# Filter the DataFrame to select data for the specified date (we picked a random Sunday)
filtered_df = df.filter(df['Date'] == selected_date)

# Show the filtered data
filtered_df.show()

hour_range = filtered_df.select(min('Time'), max('Time')).first()

# Extract the minimum and maximum values
min_hour = hour_range[0]
max_hour = hour_range[1]

day_of_week = filtered_df.select(date_format('Date', 'EEEE').alias('DayOfWeek')).first()

# Extract the day of the week
day_of_week = day_of_week['DayOfWeek']

# Display the day of the week
print(f"The selected date ({selected_date}) was a {day_of_week}.")

# Display the hour range
print(f"Hour range for {selected_date}:         {min_hour}      to      {max_hour}")

+-------------------+----------+-----+------------------+------------------+-------+------------------+--------------------+------------------+------------------+------------------+------------------+--------------------+
|           DateTime|      Date| Time|                BO|                BH|     BL|                BC|                 BCh|                AO|                AH|                AL|                AC|                 ACh|
+-------------------+----------+-----+------------------+------------------+-------+------------------+--------------------+------------------+------------------+------------------+------------------+--------------------+
|2019-09-12 00:00:00|2019-09-12|00:00|           1.10131|1.1013700000000002|1.10074|             1.101|-3.10000000000032...|           1.10144|           1.10149|           1.10086|           1.10114|-2.99999999999966...|
|2019-09-12 01:00:00|2019-09-12|01:00|           1.10102|1.1014700000000002|1.10102|           1.10131|2.8999999

So apparently the Data Range for each week is from :

Sunday              from 21:00 to 23:00

Monday - Thursday   from 00:00 to 23:00

Friday              from 00:00 to 21:00

### Displaying the summary statistics generated in the previous step in a tabular format.

In [28]:
df.select("BO", "BH", "BL", "BC", "BCh", "AO", "AH", "AC", "ACh").describe().show()
# show summary statistics only for the numerical columns

+-------+-------------------+------------------+------------------+------------------+--------------------+-------------------+------------------+------------------+--------------------+
|summary|                 BO|                BH|                BL|                BC|                 BCh|                 AO|                AH|                AC|                 ACh|
+-------+-------------------+------------------+------------------+------------------+--------------------+-------------------+------------------+------------------+--------------------+
|  count|              93084|             93084|             93084|             93084|               93084|              93084|             93084|             93084|               93084|
|   mean| 1.2657340146534282|1.2667553338919628|1.2647362568217932| 1.265733121803963|-8.92849469295910...| 1.2659002723346635|1.2669171722315304|1.2658992623866556|-1.00994800395440...|
| stddev|0.12689441842290416|0.1270596992463785| 0.12672217286271

### Let's find the null values in the dataframe

In [29]:
null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])

null_counts.show()
# apparently we have no nulls

+--------+----+----+---+---+---+---+---+---+---+---+---+---+
|DateTime|Date|Time| BO| BH| BL| BC|BCh| AO| AH| AL| AC|ACh|
+--------+----+----+---+---+---+---+---+---+---+---+---+---+
|       0|   0|   0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
+--------+----+----+---+---+---+---+---+---+---+---+---+---+



In [30]:
# If we had nulls we could have used this statement and set the parameter how  to all 
# to drops rows that have the entire row has NULL values it will be dropped
df = df.na.drop(how="all")

In [31]:
df.count()

93084

In [32]:
# here we drop the row if it doesn't have 2 NON NULL values in the cells of the row
df = df.na.drop(how="all", thresh=2)

In [33]:
df = df.na.drop()

In [34]:
df.count()

93084

Well that's nice! we have 0 null values in our DataFrame!

### Lets check if we have 0 values in our pyspark dataframe

In [35]:
from pyspark.sql.types import DoubleType, IntegerType

# List of numeric column names
numeric_columns = [column for column in df.columns if df.schema[column].dataType in {DoubleType(), IntegerType()}]

# Check for 0 values in numeric columns
zero_counts = df.select([sum((col(column) == 0).cast("int")).alias(column) for column in numeric_columns])

zero_counts.show()

+---+---+---+---+---+---+---+---+---+---+
| BO| BH| BL| BC|BCh| AO| AH| AL| AC|ACh|
+---+---+---+---+---+---+---+---+---+---+
|  0|  0|  0|  0|692|  0|  0|  0|  0|674|
+---+---+---+---+---+---+---+---+---+---+



alternative way

In [36]:
from pyspark.sql import functions as F

# Select only numeric columns
numeric_columns = ['BO', 'BH', 'BL', 'BC', 'BCh', 'AO', 'AH', 'AL', 'AC', 'ACh']

# Check for 0 values in the numeric columns
zero_counts = df.select([F.sum(F.when(df[column] == 0, 1).otherwise(0)).alias(column) for column in numeric_columns])

# Show the result
zero_counts.show()

+---+---+---+---+---+---+---+---+---+---+
| BO| BH| BL| BC|BCh| AO| AH| AL| AC|ACh|
+---+---+---+---+---+---+---+---+---+---+
|  0|  0|  0|  0|692|  0|  0|  0|  0|674|
+---+---+---+---+---+---+---+---+---+---+



In [37]:
# displaying the rows that have 0 values in BCh or Ach columns
filtered_df = df.filter((col('BCh') == 0) | (col('ACh') == 0))

filtered_df.show(10)

+-------------------+----------+-----+------------------+------------------+------------------+------------------+---+-------+-------+------------------+-------+---+
|           DateTime|      Date| Time|                BO|                BH|                BL|                BC|BCh|     AO|     AH|                AL|     AC|ACh|
+-------------------+----------+-----+------------------+------------------+------------------+------------------+---+-------+-------+------------------+-------+---+
|2005-05-02 03:00:00|2005-05-02|03:00|            1.2851|            1.2859|             1.285|            1.2851|0.0| 1.2853| 1.2861|            1.2852| 1.2853|0.0|
|2005-05-02 05:00:00|2005-05-02|05:00|            1.2854|            1.2858|            1.2853|            1.2854|0.0| 1.2856|  1.286|            1.2855| 1.2856|0.0|
|2005-05-03 02:00:00|2005-05-03|02:00|            1.2833|            1.2836|            1.2831|            1.2833|0.0| 1.2835| 1.2838|            1.2833| 1.2835|0.0|
|200

### Note on 0 Values in Bid Change (BCh) and Ask Change (ACh) Columns

In the Forex market, the Bid Open (BO) and Bid Close (BC) columns, as well as the Ask Open (AO) and Ask Close (AC) columns, represent the opening and closing prices for bid and ask orders, respectively. In instances where these values are equal, it results in a Bid Change (BCh) and Ask Change (ACh) of 0.

This occurrence indicates a state of price stability where there is no change in the bid or ask prices throughout the time interval. In financial terms, this reflects a period where the opening and closing prices remain constant, signaling a lack of fluctuation in the market during that specific time frame.

While 0 values in BCh and ACh might seem unusual, they are a valid representation of price stability, and it is a common scenario in financial data analysis. Therefore, for the purpose of this analysis, these 0 values will not be modified, as they accurately capture the absence of price changes during certain intervals.

### Lets check if we have empty strings in our pyspark dataframe:

In [46]:
from pyspark.sql.functions import when

# Selecting all columns from the DataFrame
columns = df.columns

# Checking for empty strings in each column
empty_string_counts = df.select([sum(when(col(column) == "", 1).otherwise(0)).alias(column) for column in columns])

# Displaying the result
empty_string_counts.show()

+--------+----+----+---+---+---+---+---+---+---+---+---+---+
|DateTime|Date|Time| BO| BH| BL| BC|BCh| AO| AH| AL| AC|ACh|
+--------+----+----+---+---+---+---+---+---+---+---+---+---+
|       0|   0|   0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
+--------+----+----+---+---+---+---+---+---+---+---+---+---+



Great we do not have empty strings in our dataframe

## Data and Column Manipulation

In [38]:
# df = df.withColumn("PriceRange", col("BH") - col("BL"))
# # we can also selected multiple columns
# df.select(['Datetime','Date','Time','BO']).show(3)

In [39]:
# Creating New Columns and Explain Why
# Let's assume you want to calculate the daily price range as a new column
# Price range = High Price (BH) - Low Price (BL)

# df = df.withColumn("PriceRange", col("BH") - col("BL"))
# df.show(5)