In [19]:
import os
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import to_date, col

# Initialize Spark session with custom configurations
spark = SparkSession.builder \
    .appName("DataIngestion") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

print("Spark session initialized with custom configurations")

# Load the cleaned dataset into a Pandas DataFrame
df = pd.read_csv('cleaned_dataset.csv')
print("Data loaded into Pandas DataFrame")

# Convert the Pandas Data Frame to a Spark Data Frame
spark_df = spark.createDataFrame(df)
print("Pandas DataFrame converted to Spark DataFrame")

# Transform: Handle missing values (example: fill with 0 or mean)
spark_df = spark_df.fillna(0)
print("Missing values handled")

# Convert the date column to date type if applicable
spark_df = spark_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
print("Date column converted")

# Partition the data by year for better performance if applicable
spark_df = spark_df.repartitionByRange("date")
print("Data partitioned by date")

# Define output path and ensure the directory exists
output_path = os.path.expanduser("~/istanbul_stock_exchange")
os.makedirs(output_path, exist_ok=True)
print(f"Output path: {output_path}")

# Save the DataFrame to the specified path in Parquet format
spark_df.write.mode("overwrite").parquet(output_path)
print("Data saved to Parquet format")

# Read back the saved Parquet files to verify
spark_df_read = spark.read.parquet(output_path)
spark_df_read.show(5)
print("Data read back from Parquet files and displayed")

# Stop the Spark session
spark.stop()
print("Spark session stopped")

24/07/27 13:15:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Spark session initialized with custom configurations
Data loaded into Pandas DataFrame
Pandas DataFrame converted to Spark DataFrame
Missing values handled
Date column converted
Data partitioned by date
Output path: /home/b1519e89-0a35-4c07-9c0a-b03ec2293045/istanbul_stock_exchange
Data saved to Parquet format
+----------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+
|      date|TL BASED ISE|USD BASED ISE|          SP|         DAX|        FTSE|      NIKKEI|     BOVESPA|          EU|          EM|
+----------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+
|      NULL|         ISE|          ISE|          SP|         DAX|        FTSE|      NIKKEI|     BOVESPA|          EU|          EM|
|2009-01-05| 0.035753708|  0.038376187|-0.004679315| 0.002193419| 0.003894376|           0| 0.031190229| 0.012698039| 0.028524462|
|2009-01-06| 0.025425873|  0.0318

In [22]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("ParquetFile").getOrCreate()

# Update the file path to the correct location
directory_path = "/home/b1519e89-0a35-4c07-9c0a-b03ec2293045/istanbul_stock_exchange"
file_path = os.path.join(directory_path, "part-00000-ac3332bf-f7a6-434b-9ca2-90ae5f136c88-c000.snappy.parquet")

# Load Parquet file
df = spark.read.parquet(file_path)

# Show the dataframe
df.show()


+----------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+
|      date|TL BASED ISE|USD BASED ISE|          SP|         DAX|        FTSE|      NIKKEI|     BOVESPA|          EU|          EM|
+----------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+
|      NULL|         ISE|          ISE|          SP|         DAX|        FTSE|      NIKKEI|     BOVESPA|          EU|          EM|
|2009-01-05| 0.035753708|  0.038376187|-0.004679315| 0.002193419| 0.003894376|           0| 0.031190229| 0.012698039| 0.028524462|
|2009-01-06| 0.025425873|  0.031812743| 0.007786738| 0.008455341| 0.012865611| 0.004162452|  0.01891958| 0.011340652| 0.008772644|
|2009-01-07| -0.02886173| -0.026352966|-0.030469134|-0.017833062|-0.028734593| 0.017292932|-0.035898576|-0.017072795|-0.020015412|
|2009-01-08|-0.062208079| -0.084715902| 0.003391364|-0.011726277|-0.000465999|-0.04