# Transform coins data using Spark

## This code is run and saved on Colab. The output of this code is displayed in a CSV file in the same folder, called 'Coins_Cleaned.csv'

## Install pyspark

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=eef528a8d154fda2eee4cc1ee84724e35029d8faf07a1802fe3dd3ec069c50d3
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## Import the necessary modules

In [None]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import explode, col, from_json, lit

## Processes and combines cryptocurrency data using PySpark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, from_json, when, size

# Initialize Spark Session
spark = SparkSession.builder.appName("Coin Price Analysis").getOrCreate()

# Define the path to the directory containing your JSON files
path_to_json_directory = '/content/'

# Loop over the dictionary keys, which represent the file indices.
for file_index in range(1, 96):
    json_file_path = f"{path_to_json_directory}crypto{file_index}.json"
    rdd = spark.sparkContext.wholeTextFiles(json_file_path).values()
    json_schema = spark.read.json(rdd).schema
    df = spark.createDataFrame(rdd, StringType()) \
        .select(from_json("value", json_schema).alias("data")) \
        .select("data.*")

    # Loop over each currency within this file index.
    for currency in df.select("data.*").columns:
        # Check if the currency is active before processing
        df_exploded = df.select(
    explode(col(f"data.{currency}")).alias("details")
    ).select(
    col("details.quotes"),
    col("details.id").alias("id"),
    col("details.name").alias("name"),
    col("details.symbol").alias("symbol"),
    col("details.is_active").alias("is_active")
    )
        df_currency = df_exploded.select("is_active")
        if df_currency.filter(col("is_active") == 1).count() > 0:
          df_final = df_exploded.select(
        explode(col("quotes")).alias("quotes"), "id","name","symbol", "is_active").select(
    col("quotes.quote.USD.*"),
    col("quotes.timestamp").alias("quote_timestamp"),
    "id",
    "name",
    "symbol",
    "is_active"
    )

        # Combine the DataFrame
        if final_df is None:
          final_df = df_final
        else:
          final_df = final_df.union(df_final)

# Show the combined DataFrame
final_df.show(truncate=False)

+------------------+---------------------+-----------------+------------------+------------------+-----------------+------------------+------------------------+-----------------+--------------+------------------------+----+-------+------+---------+
|circulating_supply|market_cap           |percent_change_1h|percent_change_24h|percent_change_30d|percent_change_7d|price             |timestamp               |total_supply     |volume_24h    |quote_timestamp         |id  |name   |symbol|is_active|
+------------------+---------------------+-----------------+------------------+------------------+-----------------+------------------+------------------------+-----------------+--------------+------------------------+----+-------+------+---------+
|3.55695351669E10  |2.274831167346767E10 |-0.044694212696  |8.922506415286    |1.344609752466    |-16.353234856636 |0.6395448117814203|2024-03-21T00:00:00.000Z|3.671463594304E10|9.7186707046E8|2024-03-21T00:00:00.000Z|2010|Cardano|ADA   |1        |
|3.5

## Calculate moving average and add as a new column

In [None]:
# Moving average (MA)
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Converts an ISO 8601 time string to a timestamp type
df_sorted = final_df.withColumn("quote_timestamp", F.to_timestamp("quote_timestamp"))
df_sorted = df_sorted.withColumn("unix_timestamp", F.unix_timestamp("quote_timestamp"))
windowSpec = Window.partitionBy("id") \
                   .orderBy("unix_timestamp") \
                   .rowsBetween(-168, Window.currentRow)

df_with_ma = df_sorted.withColumn("ma_7d", F.avg("price").over(windowSpec))
df_with_ma.show()

## Convert the Spark DataFrame to a Pandas DataFrame and save it to a pickle file.

In [None]:
pandas_df = df_with_ma.toPandas()
pandas_df.to_pickle('/content/coins2.pkl')

## Export as a csv file

In [None]:
import pandas as pd
df0 = pd.read_pickle('/Users/eva/Downloads/coins0.pkl')
df1 = pd.read_pickle('/Users/eva/Downloads/coins1.pkl')
df2 = pd.read_pickle('/Users/eva/Downloads/coins2.pkl')

# Concatenate them vertically
combined_df = pd.concat([df0, df1], ignore_index=True)
combined_df1 = pd.concat([combined_df, df2], ignore_index=True)

combined_csv_file_path = '/Users/eva/Downloads/Coins_Cleaned.csv'
combined_df1.to_csv(combined_csv_file_path, index=False)