# 1. Initializing the workspace for the data preparation. I will use pyspark for this task.

In [1]:
from pyspark.sql import SparkSession, types, functions
import pandas as pd
import matplotlib.pyplot as plt
import os

#### 2. We need to start a sparksession in order to use pyspark.

In [2]:
spark = SparkSession.builder.appName("data_cleaning_and_exploration").getOrCreate()

#### 3. creating two functions to read all the files 

In [4]:
local_path = r"C:\Users\yusuf\OneDrive\Desktop\Forex Project"
#files are located on the same folder so simplifying access with just getting the relative
def relative_path_getter(local_path):
    csv_files = [f for f in os.listdir(local_path) if f.endswith(".csv") and os.path.isfile(os.path.join(local_path, f))]
    return csv_files
def df_creator(csv_files=None, local_path=""):
    if csv_files == None:
        return f"there is no file to process for dataframe creation"
    file_store = {}
    schema = types.StructType([
        types.StructField("Time", types.TimestampType(), True),
        types.StructField("Open", types.DoubleType(), True),
        types.StructField("High", types.DoubleType(), True),
        types.StructField("Low", types.DoubleType(), True),
        types.StructField("Close", types.DoubleType(), True),
        types.StructField("Volume", types.IntegerType(), True)
    ])
    for file in csv_files:
        file_name = os.path.splitext(file)[0]
        file_path = os.path.join(local_path, file)
        file_store[file_name] = spark.read.csv(file_path, header=True ,schema=schema)
    return file_store
csv_file_names = relative_path_getter(local_path)
dfs = df_creator(csv_file_names, local_path)
dfs

{'EURUSD_M1': DataFrame[Time: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int],
 'GBPUSD_M1': DataFrame[Time: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int],
 'USDCAD_M1': DataFrame[Time: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int],
 'USDCHF_M1': DataFrame[Time: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int]}

#### 4. Data exploration and Analysis

In [None]:
# it turns out that pyspark is not directly suited for the use of the classical visualization so we need to convert the data to pandas.
# or skip the steps and directly do the cleaning on the data. or clean and prepare it then visualize it by creating pandas df copys of it.
# which means we are going to do data cleaning first and then do visualizing later on and I think it is not possible to do feature engineering since there are already few columns and tasks is clear.
def cleaner(files):
    for name, file in files.items():
        file = file.dropDuplicates()
        # I will fill nas with mean values here.
        mean_vals = {}
        for col_name in file.columns:
            try:
                mean_vals[col_name] = file.select(functions.mean(col_name)).first()[0]
            except:
                pass
        file = file.fillna(mean_vals)
        files[name] = file
    return files
# Now time to add a feature engineering function so that we can create new features and train even greater model !!
def feature_engineering(clean_files) -> dict:
    pass
#and I think of adding visualization function after doing all the stuff on the data or maybe I should be comparing data prior ??
cleaned_files = cleaner(dfs)

{'EURUSD_M1': DataFrame[Time: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int],
 'GBPUSD_M1': DataFrame[Time: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int],
 'USDCAD_M1': DataFrame[Time: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int],
 'USDCHF_M1': DataFrame[Time: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int]}

In [4]:
spark.stop()