<h1><center>Big Data Algorithms Techniques & Platforms</center></h1>
<h2>
<hr style=" border:none; height:3px;">
<center>Spark and DataFrames</center>
<hr style=" border:none; height:3px;">
</h2>

## Objectives

<strong> Dataframes: </strong>
<ul>
    <li>  Pyspark </li>
    <li>  Pandas library on Spark</li>
</ul>

# A. Context

<p align="justify">
<font size="3">
For running this serie of exercises we are going to use a quite big dataset containing data on Bitcoin made available from <a href="https://www.kaggle.com/mczielinski/bitcoin-historical-data">Kaggle</a>.

As stated in the description of the dataset:
"Bitcoin is the longest running and most well known cryptocurrency, first released as open source in 2009 by the anonymous Satoshi Nakamoto. Bitcoin serves as a decentralized medium of digital exchange, with transactions verified and recorded in a public distributed ledger (the blockchain) without the need for a trusted record keeping authority or central intermediary."
</font>
</p>


### The dataset

<p align="justify">
<font size="3">
The dataset is in a .csv file:

$btcusd\_1-min\_data.csv $

CSV files for select bitcoin exchanges for the time period of Jan 2012 to December March 2021, with minute to minute updates of OHLC (Open, High, Low, Close), Volume in BTC and indicated currency, and weighted bitcoin price.

Notice that:
<ul>
    <li> Timestamps are in Unix time.</li>
<li> Timestamps without any trades or activity have their data fields filled with NaNs. </li>
<li>  If a timestamp is missing, or if there are jumps, this may be because the exchange (or its API) was down, the exchange (or its API) did not exist, or some other unforeseen technical error in data reporting or gathering. </li>
</ul>
As stated by the authors "all effort has been made to deduplicate entries and verify the contents are correct and complete to the best of my ability, but obviously trust at your own risk".
</p>
</font>

In [1]:
# import of Pandas library
import pandas as pa


# B. Environment set-up

<p align="justify">
<font size="3">
As first step you must include your dataset in your environment.

You can folllow the procedure that includes Kaggle data into colab working folders or simply download and re-upload the file on your Colab space.


$btcusd\_1-min\_data.csv $
    
and upload it in the folder where your notebook is supposed to read the input.

</font>
</p>

<p align="justify">
<font size="3">
As second step you must prepare your environment running the following two cells that:
<ul>
    <li> Import the Pandas library.</li>
<li> Set the Spark environment and return a SparkSession (acting as was acting the SparkContext in the previous exercises). </li>
</ul>    
    

</font>
</p>

In [2]:
#import of the SparkSession
from pyspark.sql import SparkSession

#inizialization of the Spark Session
spark = SparkSession \
    .builder \
    .appName("tp2") \
    .getOrCreate()

print("initialization successful")

25/07/26 17:59:24 WARN Utils: Your hostname, MacBook-Pro-de-Alex.local resolves to a loopback address: 127.0.0.1; using 10.0.0.25 instead (on interface en0)
25/07/26 17:59:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/26 17:59:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/26 17:59:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


initialization successful


In [3]:
! pip install kaggle

! mkdir ~/.kaggle

! cp kaggle.json ~/.kaggle/

! chmod 600 ~/.kaggle/kaggle.json


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
mkdir: /Users/alexandregravereaux/.kaggle: File exists
cp: kaggle.json: No such file or directory
chmod: /Users/alexandregravereaux/.kaggle/kaggle.json: No such file or directory


In [8]:
import os

os.environ['KAGGLE_USERNAME'] = 'username'
os.environ['KAGGLE_KEY'] = 'key'

! kaggle datasets download mczielinski/bitcoin-historical-data

Dataset URL: https://www.kaggle.com/datasets/mczielinski/bitcoin-historical-data
License(s): CC-BY-SA-4.0
Downloading bitcoin-historical-data.zip to /Users/alexandregravereaux/Desktop/sg8-cloud-computing-labs/lab2-spark
  0%|                                               | 0.00/95.4M [00:00<?, ?B/s]
100%|██████████████████████████████████████| 95.4M/95.4M [00:00<00:00, 5.84GB/s]


In [9]:
! unzip -o bitcoin-historical-data.zip

Archive:  bitcoin-historical-data.zip
  inflating: btcusd_1-min_data.csv   


### <strong> Exercise 1.</strong> First import
    
<p align="justify">
<font size="3">
Import the csv file in Spark DataFrame. If you have any doubt you can always refer to the Spark 3.1.1 documentation:

<a href="https://spark.apache.org/docs/3.1.1/">Spark Reference Documentation</a>

</font>
</p>

In [10]:
# Write the command that creates (reads) a Spark DataFrame and stores the reference in the dfs variable

#'''############## WRITE YOUR CODE HERE ##############'''"
schema = "Timestamp double, Open double, High double, Low double, Close double, Volume double"
dfs = spark.read.option("header", "true").schema(schema).csv("btcusd_1-min_data.csv")
#'''############## END OF THE EXERCISE ##############'''

#show the DataFrame schema
dfs


#######################
# EXPECTED OUTPUT:
# DataFrame[Timestamp: int, Open: double, High: double, Low: double, Close: double, Volume: double]</font>
#
# Notice that if you have something like:
# DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string]
# you forgot a step: you did not include the schema of the columns
#
# Notice also that if you have:
# DataFrame[Timestamp: string, Open: string, High: string, Low: string, Close: string, Volume_(BTC): string]
# you also forgot a step: the type of the Timestamp must be a String
###########################

DataFrame[Timestamp: double, Open: double, High: double, Low: double, Close: double, Volume: double]

## B.1  File import
    
<p align="justify">
<font size="3">
In this exercise the goal is to create a Spark DataFrame from the csv file in imput.

Recall that in Spark DataFrame the type of the columns is very important for the definition of the internal data representation.
    
For this step you the target set of typed columns is the following one:
<ul>
    <li>    $Date\_Time: Timestamp$ </li>
     <li>   $Open: double$ </li>
     <li>   $High: double$ </li>
    <li>    $Low: double$ </li>
    <li>    $Close: double$ </li>
    <li>    $Volume: double$ </li>
</ul>
    
We will arrive to define the schema in 3 guided steps described in the following sections.
</font>
</p>

<p align="justify">
<font size="3">
Notice that the header of the $csv$ file contains the data description and that the simple import of the
file treats the timestamp column as a String.
</font>
</p>

<p align="justify">
<font size="3">
In data import you must check that:
<ul>
    <li>  the types of the imported data (the ones read from the file using the operation you choose) are equal to the types in the given schema</li>
    <li>  the names of columns correspond (and make transofrmations if necessary). </li>
</ul>
    
</font>
</p>

In [11]:
# the following command is going to show 5 rows of the DataFrame
dfs.take(5)

[Row(Timestamp=1325412060.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412120.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412180.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412240.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412300.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0)]

### <strong> Exercise 2. </strong> Timestamp column
    
<p align="justify">
<font size="3">
Refine the import of the csv file and convert the "timestamp" column in the proper $Timestamp$ type:
    <ul>
        <li>   Create a new column <code>Date\_Time</code> that is the conversion of the $String$ column $Timestamp$ in $Timestamp$ type  </li>
</ul>
The Dataframe are immutable structure, then your procedure will use a command (discussed in the slides) that will create a new Spark $DataFrame$ from the $dfs$ $DataFrame$ having a different schema.

</font>
</p>

<p align="justify">
<font size="3">
Look at the timestamp column of the csv file and from the imported DataFrame
</font>
</p>

<p align="justify">
<font size="3">
Look again at the target schema:
    
<ul>
    <li>    $Date\_Time: Timestamp$ </li>
     <li>   $Open: double$ </li>
     <li>   $High: double$ </li>
    <li>    $Low: double$ </li>
    <li>    $Close: double$ </li>
    <li>    $Volume: double$ </li>
    <li>    $Weighted\_Price: double$ </li>
</ul>
    
You notice that the import data has three problems with respect to the target schema:
    
    
<ul>
    <li> the $Date\_Time$ column is not present in the original file </li>
    <li> there is an $int$ column $Timestamp$ that can be converted and transformed to a $Date$</li>
    <li> some of the column names contain not required parentesis. </li>
</ul>     
</font>
</p>




In [12]:
# write the command that creates a new Data Frame Spark with Date_Time column
# and stores the reference in the dfsdt variable (it must be a DataFrame Spark with Date_Time column)
from pyspark.sql.functions import from_unixtime, col

#'''############## WRITE YOUR CODE HERE ##############'''

dfsdt = dfs.withColumn("Date_Time", from_unixtime(col("Timestamp")).cast('timestamp'))

#'''############## END OF THE EXERCISE ##############'''

#show the DataFrame schema
dfsdt

#######################
# EXPECTED OUTPUT:
# DataFrame[Timestamp: double, Open: double, High: double,
# Low: double, Close: double, Volume: double, Date_Time: timestamp]
#######################

DataFrame[Timestamp: double, Open: double, High: double, Low: double, Close: double, Volume: double, Date_Time: timestamp]

In [13]:
#show 5 rows of the DataFrame
dfsdt.take(5)

[Row(Timestamp=1325412060.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0, Date_Time=datetime.datetime(2012, 1, 1, 11, 1)),
 Row(Timestamp=1325412120.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0, Date_Time=datetime.datetime(2012, 1, 1, 11, 2)),
 Row(Timestamp=1325412180.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0, Date_Time=datetime.datetime(2012, 1, 1, 11, 3)),
 Row(Timestamp=1325412240.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0, Date_Time=datetime.datetime(2012, 1, 1, 11, 4)),
 Row(Timestamp=1325412300.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0, Date_Time=datetime.datetime(2012, 1, 1, 11, 5))]

### <strong> Exercise 3.</strong> Column names


<p align="justify">
<font size="3">
As you can see from the output of the previous exercise the names of the columns still present some problems since there are some parentesis that are not required.
    <ul>
     <li> Remove the not required parentesis from the colum names </li>
     <li> Hint: look at the documentation of DataFrame API and check the operation for column renaming </li>
</ul>
</font>
</p>


In [14]:
# write the command that creates a new Data Frame Spark with Volume_BTC as name for Volume
# and store the reference in the dfscr variable (Data Frame Spark with Correct Names)

#'''############## WRITE YOUR CODE HERE ##############'''

dfscr = dfsdt.withColumnRenamed("Volume", "Volume_BTC")

#'''############## END OF THE EXERCISE ##############'''

#show the DataFrame schema
dfscr

#######################
# EXPECTED OUTPUT:
#DataFrame[Timestamp: int, Open: double, High: double, Low: double, Close: double,
#          Volume_BTC: double]
#######################

DataFrame[Timestamp: double, Open: double, High: double, Low: double, Close: double, Volume_BTC: double, Date_Time: timestamp]

In [9]:
#show 5 rows of the DataFrame
dfscr.show(5)

#######################
# Expected output:
#+------------+----+----+----+-----+----------+-------------------+
#|   Timestamp|Open|High| Low|Close|Volume_BTC|          Date_Time|
#+------------+----+----+----+-----+----------+-------------------+
#|1.32541206E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:01:00|
#|1.32541212E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:02:00|
#|1.32541218E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:03:00|
#|1.32541224E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:04:00|
#| 1.3254123E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:05:00|
#+------------+----+----+----+-----+----------+-------------------+#

+------------+----+----+----+-----+----------+-------------------+
|   Timestamp|Open|High| Low|Close|Volume_BTC|          Date_Time|
+------------+----+----+----+-----+----------+-------------------+
|1.32541206E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:01:00|
|1.32541212E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:02:00|
|1.32541218E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:03:00|
|1.32541224E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:04:00|
| 1.3254123E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:05:00|
+------------+----+----+----+-----+----------+-------------------+
only showing top 5 rows



## B.2 DataFrame columns


<p align="justify">
<font size="3">
    
In this part of the exercise we are going continue to  modify in the Spark DataFrames.

    
Remember that using  PySpark, it's possible to access a DataFrame's columns either by attribute (<code>df.attributeName</code>) or by indexing <code>(df['attributeName'])</code>.
</font>
</p>


<p align="justify">
<font size="3">
    
Loook at the list of the functions to get familiar with the documentation: some functions that can be of help to manipulate the schema:
    
<ul>
     <li>    <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions">Spark Functions</a>.  </li>
</ul>    
    
    
</font>
</p>



### <strong> Exercise 4.</strong>  Add two new columns to the DataFrame
    
<p align="justify">
<font size="3">
We want to extend the DataFrame with two other columns: given the $Date\_Time$ column create two new columns ($Year$ and $Month$) that contain
    <ul>
     <li> the year </li>
     <li> the month of the year </li>
</ul>
    
</font>
</p>
    
    
    

<p align="justify">
<font size="3">    
Look at the documentation of Spark functions and find the two functions that are convenient for this use case (hint: the name of the columns can help: <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions">Spark Functions</a>)
</font>
</p>

In [15]:
#import the functions that you will use

############## WRITE YOUR CODE HERE ##############

from pyspark.sql.functions import year, month

############## END OF THE EXERCISE ##############

In [16]:
# write the command that creates a new Data Frame Spark with the two additional columns
# and store the reference in the dfsym variable (Data Frame Spark with Correct Names)

#'''############## WRITE YOUR CODE HERE ##############'''

dfsym = dfscr.withColumn("Year", year("Date_Time")) \
       .withColumn("Month", month("Date_Time"))

#'''############## END OF THE EXERCISE ##############'''




In [17]:
# Drop the lines having year or month equal to null and store in dfsc

#'''############## WRITE YOUR CODE HERE ##############'''
dfsc = dfsym.na.drop()
#'''############## END OF THE EXERCISE ##############'''

dfsc.show(5)

#######################
# Expected output:
#+----+----+----+-----+----------+-------------------+----+-----+
#|Open|High| Low|Close|Volume_BTC|          Date_Time|Year|Month|
#+----+----+----+-----+----------+-------------------+----+-----+
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:01:00|2012|    1|
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:02:00|2012|    1|
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:03:00|2012|    1|
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:04:00|2012|    1|
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:05:00|2012|    1|
#+----+----+----+-----+----------+-------------------+----+-----+

+------------+----+----+----+-----+----------+-------------------+----+-----+
|   Timestamp|Open|High| Low|Close|Volume_BTC|          Date_Time|Year|Month|
+------------+----+----+----+-----+----------+-------------------+----+-----+
|1.32541206E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:01:00|2012|    1|
|1.32541212E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:02:00|2012|    1|
|1.32541218E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:03:00|2012|    1|
|1.32541224E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:04:00|2012|    1|
| 1.3254123E9|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:05:00|2012|    1|
+------------+----+----+----+-----+----------+-------------------+----+-----+
only showing top 5 rows



###  <strong>Exercise 5.</strong>  Drop Timestamp
    
<p align="justify">
<font size="3">
Finally we clean the schema and we can remove the the $Timestamp$ column.
</font>
</p>


In [18]:
# write the command that creates a new DataFrame Spark from the dfsym without the Timestamp column
# and store the reference in the dfmc variable (Data Frame Spark Clean)
#'''############## WRITE YOUR CODE HERE ##############'''
dfsmc = dfsc.drop("Timestamp")
#'''############## END OF THE EXERCISE ##############'''


dfsmc.show(5)

#+----+----+----+-----+----------+-------------------+----+-----+
#|Open|High| Low|Close|Volume_BTC|          Date_Time|Year|Month|
#+----+----+----+-----+----------+-------------------+----+-----+
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:01:00|2012|    1|
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:02:00|2012|    1|
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:03:00|2012|    1|
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:04:00|2012|    1|
#|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 10:05:00|2012|    1|
#+----+----+----+-----+----------+-------------------+----+-----+


+----+----+----+-----+----------+-------------------+----+-----+
|Open|High| Low|Close|Volume_BTC|          Date_Time|Year|Month|
+----+----+----+-----+----------+-------------------+----+-----+
|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:01:00|2012|    1|
|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:02:00|2012|    1|
|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:03:00|2012|    1|
|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:04:00|2012|    1|
|4.58|4.58|4.58| 4.58|       0.0|2012-01-01 11:05:00|2012|    1|
+----+----+----+-----+----------+-------------------+----+-----+
only showing top 5 rows



#  C. Using Parquet

<p align="justify">
<font size="3">
In order to gain in performance in the following it is a good idea, as we have seen at lesson, to use a NoSQL structure, here Parquet, that will
    allow
to partition the SparkDataframe and to store it in multiple Parquet files.
</font>
</p>

<p align="justify">
<font size="3">
Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.
</font>
</p>

## C.1 Saving data in Parquet
    
For this first example partition the file according to:
    
 <ul>
     <li> the year </li>
             <li> the month of the year </li>
</ul>
The $partitionBy()$ operation can help for this step (Documentation of reference: <a href="https://spark.apache.org/docs/latest/sql-data-sources-parquet.html">Spark Functions</a>).
</font>
</p>



In [19]:
dfsmc

DataFrame[Open: double, High: double, Low: double, Close: double, Volume_BTC: double, Date_Time: timestamp, Year: int, Month: int]

In [20]:
# here you can see and check the command that saves the dfsc DataFrame in Parquet

dfsmc.write.partitionBy(["Year", "Month"]).parquet("BTC/",mode='overwrite')


print("write to Parquet done")


25/07/26 18:01:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/07/26 18:01:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/07/26 18:01:17 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/07/26 18:01:17 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/07/26 18:01:17 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/07/26 18:01:17 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/07/26 18:01:17 WARN MemoryManager: Total allocation exceeds 95.00% 

write to Parquet done


                                                                                

##  C.2 Check the folder Structure


<p align="justify">
<font size="3">
Look at the folder structure that has been created for the storage of the file. You see how the partitioning stategy of Parquet and the data distribution of Spark can be used, explicitely or implicitely, to improve performance.

While you navigate (and the folder structure) data remember that in the data access:
    
 <ul>
     <li> the navigation is done using Parquet </li>
     <li> the leaf contain the encoded Parquet files </li>
</ul>
</font>
</p>




In [21]:
#BTC
#        ├── Year=2012
#        │   ├── ...
#        │   │
#        │   ├── month=12
#        ├── Year=2013
#        │   ├── month=1
#        │   ├── ...
#        │   │
#       ...



This folder structure correspond to a phisical and logical data partition and

# D. Pandas


<p align="justify">
<font size="3">
This data organization opens the opportunity to read data also using Pandas and not using Parquet.
    
Look at the documentation and check how you can read a Parquet structure and store it in a Pandas DataFrame:
<a href="https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html">Pandas and Parquet</a>

Notice how at the data-exchange base there is the presence of Arrow (thanks to $pyarrow$).
</font>
</p>


<p align="justify">
<font size="3">
Write the command that using Pandas read the data for the year 2012.
    
</font>
</p>

In [22]:
#import of pandas
import pandas as pa

In [23]:
# Here we show you how we can create DataFrame using Pandas functions and reading from Parquet the data only for the year 2012/

df = pa.read_parquet("BTC/Year=2012")

df
#######################
# Check the expected output:
#index,Open,High,Low,Close,Volume_BTC,Date_Time,Month
#0,4.58,4.58,4.58,4.58,0.0,2012-01-01 10:01:00,1
#1,4.58,4.58,4.58,4.58,0.0,2012-01-01 10:02:00,1
#...
#...

Unnamed: 0,Open,High,Low,Close,Volume_BTC,Date_Time,Month
0,4.58,4.58,4.58,4.58,0.0,2012-01-01 10:01:00,1
1,4.58,4.58,4.58,4.58,0.0,2012-01-01 10:02:00,1
2,4.58,4.58,4.58,4.58,0.0,2012-01-01 10:03:00,1
3,4.58,4.58,4.58,4.58,0.0,2012-01-01 10:04:00,1
4,4.58,4.58,4.58,4.58,0.0,2012-01-01 10:05:00,1
...,...,...,...,...,...,...,...
526374,12.17,12.17,12.17,12.17,0.0,2012-09-30 21:55:00,9
526375,12.17,12.17,12.17,12.17,0.0,2012-09-30 21:56:00,9
526376,12.17,12.17,12.17,12.17,0.0,2012-09-30 21:57:00,9
526377,12.17,12.17,12.17,12.17,0.0,2012-09-30 21:58:00,9


###  D.1 Read Parquet file
    
<p align="justify">
<font size="3">
Here you can see now the the Spark DataFrame is created from Parquet data.
</font>
</p>

In [24]:
# And here how we can create a DataFrame using Spark and reading the whole data/

dfs_parquet = spark.read.parquet("BTC/")

print("read done")

read done


## <strong>Exercise 6</strong>. Verify number of column and count the number of rows
    
<p align="justify">
Maybe you have not noticed that the volume of data we are treating is not so small as it seems.
Count how many rows we are manipulating in the dataframe <code>dfs</code>
<font size="3">
</font>
</p>

In [25]:
# Write the command that returns the number of rows of the DataFrame

#'''############## WRITE YOUR ANSWER HERE ##############'''
count = dfs_parquet.count()
#'''############## END OF THE EXERCISE ##############'''

print(count)

#######################
# Expected output:
# 7045648

7133489


In [26]:
#We can also check and verify the schema of the DataFrame
dfs_parquet.printSchema()

root
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume_BTC: double (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)



# E. Statistics

<p align="justify">
<font size="3">
We want to calculate the statistics of the bitcoin by month for all the years.

The computed statistics will be stored in a DataFrame having this schema
<ul>
     <li>   Mean_Vol  : double </li>
     <li>   Std_Vol   : double </li>
     <li>   Min_Vol   : double </li>
     <li>   Max_vol   : double </li>
     <li>   Year      : int </li>
     <li>   Month     : int </li>
  
</ul>

In this exercise you will have two develop different methodologies to compute the statistics:
<ul>
    <li>   using the <code>applyInPandas()</code> Pyspark function and the Pandas functions </li>
     <li>  only using the Pyspark functionnalities </li>
</ul>
The statistics computed should be stored in a Pandas DataFrame with both the two approaches.
</font>
</p>

## E.1. Spark applyinPandas
<p align="justify">
<font size="3">
The solution with $applyinPandas$
</font>
</p>

In [27]:

# the Python function that must be used.

def compute_stats(key,df):
    res = df["Volume_BTC"].describe()

    res_dict = {}
    for index, value in res.items():

        if index == "mean":
            res_dict["Mean_Vol"] = value
        elif index == "std":
            res_dict["Std_Vol"] = value
        elif index == "min":
            res_dict["Min_Vol"] = value
        elif index == "max":
            res_dict["Max_Vol"] = value

    final =  pa.DataFrame([res_dict])
    final["Year"]  = key[0]
    final["Month"] = key[1]

    return final

### <strong>Exercise 7</strong>. The two parameters of the Python function
The two parameters of the Python <code>applyinPandas(funct,schema)</code> function
<p align="justify">
<font size="3">
    Look at the documentation of the <code>applyinPandas(funct,schema)</code> (<a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html">click here to go to the documentation of <code>applyinpandas</code></a>) and describe how it works in detail from the DataFrame point of view in our example (what the $key$ and the $df$ will contain in our example).

</font>
</p>

#### WRITE YOUR ANSWER HERE ###

We have to indicate the function and the schema. The function is the python function, and the schema is the string description of the DataTypes.
In our case:

key = (year, month) --> the groupby key

df = the df grouped by (year, month)

Example:
df.groupby("id").applyInPandas(
    normalize, schema="id long, v double").show()  

### <strong>Exercise 8</strong>. The two parameters in action
<p align="justify">
<font size="3">
Compute the statistics using then the $applyInPandas$ and the provided functions.

</font>
</p>

In [28]:
schema = "Mean_Vol double, Std_Vol double, Min_Vol double, Max_Vol double, Year int, Month int"

# Write the command that will store in the variable statsdf the DataFrame

#'''############## WRITE YOUR ANSWER HERE ##############'''

statsdf = dfsmc.groupby(["Year", "Month"]).applyInPandas(compute_stats, schema)

#'''############## END OF THE EXERCISE ##############'''

statsdf.show(5)


####### EXPECTED OUTPUT
#+-------------------+------------------+-------+-------------+----+-----+
#|           Mean_Vol|           Std_Vol|Min_Vol|      Max_Vol|Year|Month|
#+-------------------+------------------+-------+-------------+----+-----+
#|0.38157795769537034| 5.603393348899759|    0.0| 300.51609759|2012|    4|
#| 1.8559290136413529|14.854252197789647|    0.0|  876.5767966|2012|    8|
#| 1.8616498090071685|17.456999674520247|    0.0|2258.82314049|2012|   10|
#|  2.046982322757616|16.594671862785212|    0.0| 876.09948429|2012|   12|
#| 3.0632658169813984|24.861354787399602|    0.0|1306.36619719|2013|    2|
#+-------------------+------------------+-------+-------------+----+-----+#

print(type(statsdf))

[Stage 11:>                                                         (0 + 1) / 1]

+------------------+------------------+-------+-------------+----+-----+
|          Mean_Vol|           Std_Vol|Min_Vol|      Max_Vol|Year|Month|
+------------------+------------------+-------+-------------+----+-----+
|1.8581678891096196|17.444529026776845|    0.0|2258.82314049|2012|   10|
|11.973132482290511|50.208006597831286|    0.0|4111.87610626|2014|    4|
| 8.724308122807043| 38.13396448398607|    0.0|2037.22390381|2015|    2|
|  9.46905688541241|29.596392308584278|    0.0|1041.54305324|2015|   12|
| 2.650074683146505|12.702303333399795|    0.0|  530.1407849|2016|    7|
+------------------+------------------+-------+-------------+----+-----+
only showing top 5 rows

<class 'pyspark.sql.dataframe.DataFrame'>


                                                                                

### <strong>Exercise 9</strong>. The statsdf DataFrame
<p align="justify">
<font size="3">
Which kind of DataFrame is statsdf?
</font>
</p>

#### WRITE YOUR ANSWER HERE ###

It is a Spark DataFrame


### <strong>Exercise 10</strong>. DataFrame in Pandas


<p align="justify">
<font size="3">
Since we computed a stat by month the results will be small (we will have only one row by month)
we can get and handle all the results in memory in Pandas.  
    
Notice that Spark is lazy so the $toPandas$ action will trigger the computation.
    
Write the command that will do the operation.
    
</font>
</p>



In [29]:
%%time

# Write the command that will store in the variable stats_dfp the outoput DataFrame

#'''############## WRITE YOUR ANSWER HERE ##############'''
stats_dfp = statsdf.toPandas()
#'''############## END OF THE EXERCISE ##############'''

[Stage 14:=====>                                                  (1 + 10) / 11]

CPU times: user 10.9 ms, sys: 7.28 ms, total: 18.2 ms
Wall time: 5.26 s


                                                                                

In [30]:
#results
stats_dfp.head(10)

#######################
# Expected output:
# Mean_Vol	Std_Vol	Min_Vol	Max_Vol	Year	Month
# 0	21.683913	36.393538	1.275510e-03	300.516098	2012	4
# 1	21.108184	46.034723	0.000000e+00	876.576797	2012	8
# 2	20.396136	54.246996	9.400000e-05	2258.823141	2012	10
# 3	25.393412	53.167923	9.559000e-04	876.099484	2012	12
# 4	19.510576	60.025050	1.000000e-08	1306.366197	2013	2
# 5	13.141915	35.899066	0.000000e+00	1330.568747	2013	3
# 6	8.802936	29.688012	0.000000e+00	1000.041575	2013	6
# 7	11.753656	39.856527	1.000000e-08	1138.049014	2013	9
# 8	18.445597	58.566351	0.000000e+00	1699.510865	2013	10
# 9	22.860416	57.255573	4.442000e-05	2196.866405	2013	12


Unnamed: 0,Mean_Vol,Std_Vol,Min_Vol,Max_Vol,Year,Month
0,1.858168,17.444529,0.0,2258.82314,2012,10
1,11.973132,50.208007,0.0,4111.876106,2014,4
2,8.724308,38.133964,0.0,2037.223904,2015,2
3,9.469057,29.596392,0.0,1041.543053,2015,12
4,2.650075,12.702303,0.0,530.140785,2016,7
5,7.183424,26.878504,0.0,1616.060001,2017,3
6,8.445264,17.65373,0.0,533.100783,2017,8
7,8.346753,18.559923,0.0,602.282607,2017,10
8,2.649536,9.968894,0.0,582.564185,2018,10
9,8.272884,18.311089,0.0,806.636224,2019,5


###  <strong>Exercise 11</strong>. Show the stats of the stats


<p align="justify">
<font size="3">
We want to calculate the statistics of the bitcoin by month for all the years.

The computed statistics will be stored in a DataFrame having this schema
<ul>
     <li>   the min of the set min values </li>
     <li>   the mean of the set of mean values </li>
     <li>   ... </li>
</ul>


    
</font>
</p>


In [31]:
# Write the command that will show and compute the stats on the numerical columns of the statsdf DataFrame

#'''############## WRITE YOUR ANSWER HERE ##############'''
statsdf.describe().show()
#'''############## END OF THE EXERCISE ##############'''



#######################
# Expected output:
#+-------+------------------+------------------+--------------------+------------------+------------------+------------------+
#|summary|          Mean_Vol|           Std_Vol|             Min_Vol|           Max_Vol|              Year|             Month|
#+-------+------------------+------------------+--------------------+------------------+------------------+------------------+
#|  count|               112|               112|                 112|               112|               112|               112|
#|   mean|10.782191354847754|28.871463944232485|0.004551177678571...|1067.2847720235718|2016.0892857142858| 6.428571428571429|
#| stddev| 6.488551661205522| 18.11344145463867|0.043048607639448476| 895.8083462469303|2.7164947320662614|3.5252353718985097|
#|    min| 2.929999689326444| 6.490701567379118|                 0.0|       43.31219578|              2011|                 1|
#|    max|31.504423573146152|106.97606692383131|          0.45558087|      5853.8521659|              2021|                12|
#+-------+------------------+------------------+--------------------+------------------+------------------+------------------+


25/07/26 18:02:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-------------------+------------------+-------+-----------------+------------------+-----------------+
|summary|           Mean_Vol|           Std_Vol|Min_Vol|          Max_Vol|              Year|            Month|
+-------+-------------------+------------------+-------+-----------------+------------------+-----------------+
|  count|                163|               163|    163|              163|               163|              163|
|   mean|  5.224481594014435|16.925689954544897|    0.0| 802.547053819816|2018.3006134969326|6.392638036809816|
| stddev|  4.508721487188216|13.944503183296401|    0.0|846.9727336037818| 3.937999381344645|3.450595909927153|
|    min|0.04573099515427818|0.8360619319727159|    0.0|      43.31219578|              2012|                1|
|    max|  22.37783345623565| 72.91283698037434|    0.0|    5853.85216588|              2025|               12|
+-------+-------------------+------------------+-------+-----------------+------------------+-----------

                                                                                

# F. Plotting and equivalence
<p align="justify">
<font size="3">
We want to plot the resutls of the statistics by year and month (that will be in the $x$ orizontal axis of the plot).

$Plotly$ will be used for the plotting
    

This provided version of the code is fully working in Python.
    

A Python routine converts the two columns $Year$ and $Month$ into a $DateTime$ column 'Date' (in order to plot the data in relation with the date).
    
</font>
</p>



In [32]:
#install plotly and import the libraries

from plotly.offline import iplot,init_notebook_mode
import plotly.graph_objects as go

init_notebook_mode(connected=True)

In [33]:
#Helper function that converts the Year Month of our data into Date type

def get_date_from_year_month(df):
    df["Date"] = pa.to_datetime(df['Year'].astype(str) + '-' + df['Month'].astype(str), format='%Y-%m')
    return df


In [34]:
# In this phase we need to sort by the date to allow parallelisation of shuffled the results

stats_dfp = get_date_from_year_month(stats_dfp)
stats_dfp.sort_values(by = 'Date',inplace = True)
stats_dfp

Unnamed: 0,Mean_Vol,Std_Vol,Min_Vol,Max_Vol,Year,Month,Date
110,0.045731,0.836062,0.0,43.312196,2012,1,2012-01-01
111,0.114048,1.697511,0.0,92.654874,2012,2,2012-02-01
125,0.202970,3.518784,0.0,247.560124,2012,3,2012-03-01
15,0.380452,5.602053,0.0,300.516098,2012,4,2012-04-01
126,0.415684,5.743854,0.0,384.988000,2012,5,2012-05-01
...,...,...,...,...,...,...,...
94,1.234611,4.228104,0.0,137.016185,2025,3,2025-03-01
109,1.137380,3.744627,0.0,116.672235,2025,4,2025-04-01
30,0.823152,2.669474,0.0,82.260283,2025,5,2025-05-01
31,0.636281,2.119212,0.0,59.014533,2025,6,2025-06-01


In [35]:
# PLOTTING OF THE MEAN VOLUME BY MONTH
mean_vol_trac = {
    "x": stats_dfp.Date,
    "y": stats_dfp["Mean_Vol"],
}

layout = {
  "height":1000,
  "showlegend": True,
  "title": "Average Volume by Month of BTC",
}

fig = go.Figure(data=[mean_vol_trac], layout=layout)
fig.show()

###  <strong> Exercise 12 </strong> - Compute the statistics using Pyspark


<p align="justify">
<font size="3">
We want to calculate the statistics of the bitcoin as we did before but using Pandas.

The steps will be:
<ul>
     <li>   import data from Parquet in a Spark DataFrame </li>
     <li>   remove null values </li>
     <li>   perform the aggregation of the results </li>
     <li>   convert the results to Pandas </li>

</ul>


    
</font>
</p>

In [36]:
%%time
# solution to compute the statistics using pyspark function

from pyspark.sql.functions import min, max, mean, stddev


# full spark dataframe (recall exercise 8a)
df_spark = spark.read.parquet("BTC/")

# the na drop is important to be able to compute properly the stats
# look at the documentation of the na.drop function
group_ym = df_spark.na.drop().select(["Volume_BTC","Year","Month"]).groupBy(["Year","Month"])

#'''############## WRITE YOUR ANSWER HERE ##############'''


# aggregation
# notice that the argument of the agg function is strictly related to min Vol, max Vol, mean, and stddev.
res_df = group_ym.agg(
    min("Volume_BTC").alias("Min_Vol"),
    max("Volume_BTC").alias("Max_Vol"),
    mean("Volume_BTC").alias("Mean_Vol"),
    stddev("Volume_BTC").alias("Std_Vol"))


#conversion the results to pandas
stats_dfs = res_df.toPandas()

#'''############## END OF THE EXERCISE ##############'''

stats_dfs = get_date_from_year_month(stats_dfs)
stats_dfs.sort_values(by = 'Date',inplace = True)
stats_dfs

CPU times: user 7.77 ms, sys: 4.23 ms, total: 12 ms
Wall time: 885 ms


                                                                                

Unnamed: 0,Year,Month,Min_Vol,Max_Vol,Mean_Vol,Std_Vol,Date
158,2012,1,0.0,43.312196,0.045731,0.836062,2012-01-01
159,2012,2,0.0,92.654874,0.114048,1.697511,2012-02-01
160,2012,3,0.0,247.560124,0.202970,3.518784,2012-03-01
154,2012,4,0.0,300.516098,0.380452,5.602053,2012-04-01
161,2012,5,0.0,384.988000,0.415684,5.743854,2012-05-01
...,...,...,...,...,...,...,...
72,2025,3,0.0,137.016185,1.234611,4.228104,2025-03-01
77,2025,4,0.0,116.672235,1.137380,3.744627,2025-04-01
51,2025,5,0.0,82.260283,0.823152,2.669474,2025-05-01
84,2025,6,0.0,59.014533,0.636281,2.119212,2025-06-01


### Extra. Equivalence of results



<p align="justify">
<font size="3">
Now that you have seen the two procedures to get the results you must compare the outputs:
<ul>
     <li>   verify if the pandas dataframe from applyInPandas and PySpark functions are equivalents (look at the documentation to find the function that asserts if two DataFrames are equals) </li>
         <li> compare the processing time between applyInPandas and PySpark routine with functions (that we have visualised with the %%time function) and comment them.</li>

</ul>


    
</font>
</p>



In [37]:
# verify if the pandas dataframe from applyInPands and PySpark functions are equivalents
# compare the processing time between applyInPandas and PySpark function
cols = ['Mean_Vol', 'Std_Vol', 'Min_Vol', 'Max_Vol', 'Year', 'Month', 'Date']

#'''############## WRITE YOUR ANSWER HERE ##############'''
# comment about time execution and draw your considerations

import pandas as pd

# Tri, reset, arrondi
stats_dfp = stats_dfp.sort_values(by='Date').reset_index(drop=True).round(6)
stats_dfs = stats_dfs.sort_values(by='Date').reset_index(drop=True).round(6)

# Vérification d'équivalence (ignore l'ordre des colonnes par sécurité)
try:
    pd.testing.assert_frame_equal(
        stats_dfp,
        stats_dfs,
        check_like=True
    )
    print("Les DataFrames sont équivalents.")
except AssertionError as e:
    print("Les DataFrames sont différents.")
    print(e)

#'''############## END OF THE EXERCISE ##############''


Les DataFrames sont équivalents.


### Extra - Plotting the financial data



<p align="justify">
<font size="3">
Now that you have seen some examples you can draw your graphs:
<ul>
     <li>   filter the global data frame fron Parquet and take only the first day of the year 2021 </li>
         <li> convert it to a pandas dataframe </li>
         <li>    display the data using the $plot_candlestick$ routine </li>

</ul>


    
</font>
</p>



In [38]:
#this function helps you to display the candlestick ( representation of financial data) of the pandas dataframe

def plot_candlestick(df):
    trace = {
      "x": df.Date_Time,
      "close": dfp["Open"],
      "decreasing": {"line": {"color": "#008000"}},
      "high":df["High"] ,
      "increasing": {"line": {"color": "#db4052"}},
      "low": df["Low"],
      "name": "BTC",
      "open": df["Close"],
      "type": "candlestick"
    }

    layout = {
      "height":1000,
      "showlegend": True,
      "title": "Technical Analysis",
    }

    fig = go.Figure(data=[trace], layout=layout)
    fig.show()

In [39]:
# Exercise filter the spark dataframe by date
import datetime as dt


#'''############## WRITE YOUR ANSWER HERE ##############'''
from pyspark.sql.functions import col
from datetime import datetime

#read the global dataframe (as usual)
df_spark = spark.read.parquet("BTC/")

#create the beginning end date
beg = datetime(2021, 1, 1)
end = datetime(2021, 1, 2)


#create the filter
df_spark_filtered = df_spark.filter((col("Date_Time") >= beg) & (col("Date_Time") < end))

#apply and convert it to pandas
dfp = df_spark_filtered.toPandas()

#'''############## END OF THE EXERCISE ##############'''



In [40]:
plot_candlestick(dfp)

####  Extra - Propose your analysis



<p align="justify">
<font size="3">
Think about a new analysis on this set of data to run on your data and run it showing a graph
</font>
</p>



In [41]:
#'''############## WRITE YOUR ANSWER HERE ##############'''

#Dans cette analyse, on cherche à comprendre à quels moments de la journée le volume de Bitcoin échangé est le plus élevé.
# année 2024

import datetime as dt
import plotly.express as px

# Définir le début et la fin de l’année
beg = dt.datetime(2024, 1, 1)
end = dt.datetime(2025, 1, 1)

df_spark_filtered = df_spark.filter((col("Date_Time") >= beg) & (col("Date_Time") < end))

# Convertir en pandas
dfp = df_spark_filtered.toPandas()

dfp["Date_Time"] = pa.to_datetime(dfp["Date_Time"])

# Extraire l'heure pour regrouper par heure de la journée
dfp["Hour"] = dfp["Date_Time"].dt.hour

# Regrouper par heure et sommer les volumes
volume_per_hour = dfp.groupby("Hour")["Volume_BTC"].sum().reset_index()

# Affichage du volume par heure
fig = px.bar(volume_per_hour, x="Hour", y="Volume_BTC", title="Volume échangé par heure", labels={"Volume_BTC": "Volume BTC"})
fig.update_layout(height=500)
fig.show()

#'''############## END OF THE EXERCISE ##############'''

### Conclusion

<p align="justify">
<font size="3">
$ApplyinPandas$ can be very powerful when you need to apply advanced Python code or Python libraries (i.e. <a href="https://scikit-learn.org/stable/">scikit-learn</a>  otherwise you can use Pyspark routines relying on most powerful storage techniques for example using Parquet.

    
</font>
</p>