In [0]:
%pyspark
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import regexp_replace, col, hour, to_timestamp, udf
from pyspark.sql.functions import year, month, dayofmonth, hour

In [1]:
%pyspark
parquet_df = spark.read.parquet("/user/hive/warehouse/crypto_data1/part-*.parquet")

# Perform operations on the read DataFrame parquet_df
# For example, you can show the content of the DataFrame
parquet_df.show()

In [2]:
%pyspark
parquet_df.count()

In [3]:
%pyspark
# Data Cleaning
# Convert Price to numeric
# Replace "$" and "," in the Price column, then cast to float
parquet_df = parquet_df.withColumn("Price", regexp_replace(col("Price"), "[\$,]", "").cast("float"))
parquet_df.show()

In [4]:
%pyspark
# Clean 24H_VOLUME and Market_Cap (Remove $ and B/M/K, convert to numeric)
def clean_currency(x):
    return float(x.replace(",", "").replace("$", "").replace("B", "").replace("M", "").replace("K", ""))

clean_currency_udf = udf(clean_currency)
parquet_df = parquet_df.withColumn("24H_VOLUME", clean_currency_udf(col("24H_VOLUME")))
parquet_df = parquet_df.withColumn("Market_Cap", clean_currency_udf(col("Market_Cap")))

In [5]:
%pyspark
parquet_df.show()

In [6]:
%pyspark
# Function to clean 24H_CHANGE column
def clean_24h_change(value):
    sign = '-' if '-' in value else ''
    cleaned_value = float(value.replace('+', '').replace('-', '').replace('%', ''))
    return float(sign + str(cleaned_value))

In [7]:
%pyspark
# Applying the function to the DataFrame
clean_24h_change_udf = udf(clean_24h_change)

# Assuming parquet_df is your DataFrame
parquet_df = parquet_df.withColumn("24H_CHANGE_CLEANED", clean_24h_change_udf(col("24H_CHANGE")))

In [8]:
%pyspark
parquet_df.show()

In [9]:
%pyspark
# Drop the previous 24H_CHANGE column
parquet_df = parquet_df.drop("24H_CHANGE")


In [10]:
%pyspark
parquet_df.show()

In [11]:
%pyspark
# Convert Datetime column to timestamp type
parquet_df = parquet_df.withColumn("Datetime", to_timestamp("Datetime"))

# Extract year, month, day, and hour
parquet_df = parquet_df.withColumn("Year", year("Datetime"))
parquet_df = parquet_df.withColumn("Month", month("Datetime"))
parquet_df = parquet_df.withColumn("Day", dayofmonth("Datetime"))
parquet_df = parquet_df.withColumn("Hour", hour("Datetime"))

In [12]:
%pyspark
parquet_df.show()

In [13]:
%pyspark
parquet_df = parquet_df.drop("Datetime")

In [14]:
%pyspark
parquet_df.show()

In [15]:
%sh
pip install matplotlib

In [16]:
%pyspark
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import io
import base64

In [17]:
%pyspark


# Filter for BTC data only
btc_df = parquet_df.filter(parquet_df['Name'] == 'BTC')

# Calculate mean value of BTC price each day
mean_value_each_day = btc_df.groupBy('Day').avg('Price').orderBy('Day').toPandas()

# Plotting mean value of BTC price each day
plt.figure(figsize=(10, 6))
plt.plot(mean_value_each_day['Day'], mean_value_each_day['avg(Price)'], marker='o')
plt.xlabel('Day')
plt.ylabel('Mean BTC Price')
plt.title('Mean BTC Price for Each Day')
plt.grid(True)

# Save the Matplotlib plot as an image
img = io.BytesIO()
plt.savefig(img, format='png')
plt.close()

# Encode the image data in Base64
img.seek(0)
image_data = base64.b64encode(img.getvalue()).decode()

# Display the image using the %html interpreter
image_html = f'<img src="data:image/png;base64,{image_data}" alt="Mean BTC Price">'
print("%html " + image_html)


In [18]:
%pyspark

# Filter for BTC, ETH, XRP, and LTC data
selected_names = ['BTC', 'ETH', 'XRP', 'LTC']
selected_df = parquet_df.filter(parquet_df['Name'].isin(selected_names))

# Group by 'Name', 'Day' and calculate mean value of 'Price'
mean_price_each_day = selected_df.groupBy('Name', 'Day').avg('Price').toPandas()

# Create a figure with 2x2 subplots
fig, axs = plt.subplots(2, 2, figsize=(12, 8))
axs = axs.flatten()

# Plotting mean value of price for each cryptocurrency
for i, name in enumerate(selected_names):
    data = mean_price_each_day[mean_price_each_day['Name'] == name]
    axs[i].bar(data['Day'], data['avg(Price)'])
    axs[i].set_title(f"Mean Price of {name}")
    axs[i].set_xlabel('Day')
    axs[i].set_ylabel('Mean Price')

# Adjust layout and save the plot as an image
plt.tight_layout()
# Save the Matplotlib plot as an image
img = io.BytesIO()
plt.savefig(img, format='png')
plt.close()

# Encode the image data in Base64
img.seek(0)
image_data = base64.b64encode(img.getvalue()).decode()

# Display the image using the %html interpreter
image_html = f'<img src="data:image/png;base64,{image_data}" alt="Mean Cryptocurrency Prices">'
print("%html " + image_html)


In [19]:
%pyspark

# Calculate mean price of each cryptocurrency
mean_price_each_name = parquet_df.groupBy('Name').avg('Price').orderBy('avg(Price)').toPandas()

# Plotting mean price of each cryptocurrency
plt.figure(figsize=(12, 12))
plt.barh(mean_price_each_name['Name'], mean_price_each_name['avg(Price)'], color='skyblue')
plt.xlabel('Mean Price')
plt.title('Mean Price of Each Cryptocurrency')
plt.grid(axis='x')

# Save the Matplotlib plot as an image
img2 = io.BytesIO()
plt.savefig(img2, format='png')
plt.close()

# Encode the image data in Base64
img2.seek(0)
image_data2 = base64.b64encode(img2.getvalue()).decode()

# Display the image using the %html interpreter
image_html2 = f'<img src="data:image/png;base64,{image_data2}" alt="Mean Price of Each Cryptocurrency">'
print("%html " + image_html2)


In [20]:
%pyspark
# Using printSchema() to display the schema
parquet_df.printSchema()

In [21]:
%pyspark
parquet_df = parquet_df.drop("24H_VOLUME")
parquet_df = parquet_df.drop("24H_CHANGE_CLEANED")
parquet_df = parquet_df.drop("Market_Cap")

In [22]:
%pyspark
parquet_df.show()

In [23]:
%pyspark
# Filter the DataFrame for 'BTC' in the 'Name' column
bitcoin_df = parquet_df.filter(col('Name') == 'BTC')

# Show the resulting DataFrame
bitcoin_df.show()

In [24]:
%pyspark
bitcoin_df.count()

In [25]:
%pyspark
bitcoin_df = bitcoin_df.drop("Name")

In [26]:
%pyspark
bitcoin_df.show()

### Machine Learning


In [28]:
%pyspark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [29]:
%pyspark
# Assemble features into a vector
feature_columns = ['Year', 'Month', 'Day', 'Hour']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
assembled_data = assembler.transform(bitcoin_df)

In [30]:
%pyspark
# Split data into training and testing sets
train_data, test_data = assembled_data.randomSplit([0.8, 0.2], seed=42)

In [31]:
%pyspark
# Initialize regressors
lr = LinearRegression(labelCol='Price', featuresCol='features')
dt = DecisionTreeRegressor(labelCol='Price', featuresCol='features')
rf = RandomForestRegressor(labelCol='Price', featuresCol='features')
gbt = GBTRegressor(labelCol='Price', featuresCol='features')

In [32]:
%pyspark
# Train models
lr_model = lr.fit(train_data)

In [33]:
%pyspark
dt_model = dt.fit(train_data)

In [34]:
%pyspark
rf_model = rf.fit(train_data)

In [35]:
%pyspark
gbt_model = gbt.fit(train_data)

In [36]:
%pyspark
# Make predictions
lr_predictions = lr_model.transform(test_data)
dt_predictions = dt_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [37]:
%pyspark
# Evaluate models
evaluator = RegressionEvaluator(labelCol='Price', predictionCol='prediction', metricName='rmse')

In [38]:
%pyspark
lr_rmse = evaluator.evaluate(lr_predictions)
dt_rmse = evaluator.evaluate(dt_predictions)
rf_rmse = evaluator.evaluate(rf_predictions)
gbt_rmse = evaluator.evaluate(gbt_predictions)

In [39]:
%pyspark
print(f"Linear Regression RMSE: {lr_rmse}")
print(f"Decision Tree RMSE: {dt_rmse}")
print(f"Random Forest RMSE: {rf_rmse}")
print(f"GBT RMSE: {gbt_rmse}")

In [40]:
%pyspark
def get_best_model(lr_rmse, dt_rmse, rf_rmse, gbt_rmse):
    models = {'Linear Regression': lr_rmse, 'Decision Tree': dt_rmse, 'Random Forest': rf_rmse, 'GBT': gbt_rmse}
    best_model = min(models, key=models.get)
    return best_model

In [41]:
%pyspark
best_model_name = get_best_model(lr_rmse, dt_rmse, rf_rmse, gbt_rmse)
print(f"The best model is: {best_model_name}")

## Save best model

In [43]:
%pyspark
import json
import os
from datetime import datetime

In [44]:
%pyspark
# Example model_info
model_info = {
    "name": best_model_name,
    "training_date": str(datetime.now())
}

# Generate a unique filename based on the current timestamp
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
file_name = f"best_model_info_{timestamp}.json"  # Unique file name

# Specify the path where Hive table is located
hive_table_path = "/user/hive/warehouse/models_infos_table"

# Path to save the JSON file temporarily (in the local filesystem)
local_file_path = f"/tmp/{file_name}"  # Change to your preferred temporary directory

# Save model information as JSON in the local filesystem
with open(local_file_path, "w") as json_file:
    json.dump(model_info, json_file)

# Move the generated file to the Hive table directory
os.system(f"hdfs dfs -put {local_file_path} {hive_table_path}/{file_name}")

# Remove the temporary local file after moving to HDFS
os.remove(local_file_path)


In [45]:
%pyspark
# Save the best model as part of a pipeline
if best_model_name == 'Random Forest':
    best_model = rf_model
elif best_model_name == 'Decision Tree':
    best_model = dt_model
elif best_model_name == 'Linear Regression':
    best_model = lr_model
else:
    best_model = gbt_model

In [46]:
%pyspark
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
# Create a pipeline and add the model to it
pipeline = PipelineModel(stages=[best_model])

model_folder = f"{best_model_name}_{timestamp}"
# Save the pipeline model
pipeline.save(model_folder)