In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 45.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=bdb1164c9fa34b41d7351b9b076ac7b151622d97f33821d22da5c6f862658333
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
from pyspark.sql.functions import *
import pandas as pd
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.window import Window

# **Import data from github**

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("BT1_NguyenDucMinhTan").getOrCreate()
url_github = 'https://raw.githubusercontent.com/BrianNguyen2001/Predict-Etherium-price-movement/main/Data/pred_eth_data_price.csv'

pd_df = pd.read_csv(url_github)
df = spark.createDataFrame(pd_df)

In [None]:
df1 = df
df1.show(10)
df1.printSchema()

+----------+-------------------+------------------+--------+--------+------------------+----------------+------------------+--------+-----------+-----------+-----------+-----------+----------+
|      Time|           polarity|      subjectivity|Negative|Positive|    price_etherium|   price_bitcoin|    price_litecoin|avg_diff|act_address|    avg_gas|trans_count|block_count|block_size|
+----------+-------------------+------------------+--------+--------+------------------+----------------+------------------+--------+-----------+-----------+-----------+-----------+----------+
|2017-11-09| 0.1488095238095238|0.1251984126984127|     0.0|     1.0| 320.8840026855469|  7143.580078125| 64.26969909667969| 1.42E15|     243554|0.222171423|   467172.0|       6195|  86289173|
|2017-11-10|                0.0|               0.0|     0.0|     0.0|299.25299072265625|6618.14013671875|59.260101318359375| 1.43E15|     233966| 0.19108772|   457427.0|       6300|  84500680|
|2017-11-11|               0.05|0.2

# **Cast value**

In [None]:
df1 = df1.withColumn('Time',df1['Time'].cast('date'))

In [None]:
name_cols = []
for i in df1.columns[1:]:
  name_cols.append(''.join(i))

for j in name_cols:
  df1 = df1.withColumn(j,df1[j].cast('float'))

In [None]:
df1.show(10)
df1.printSchema()

+----------+------------+------------+--------+--------+--------------+-------------+--------------+-------------+-----------+----------+-----------+-----------+-----------+
|      Time|    polarity|subjectivity|Negative|Positive|price_etherium|price_bitcoin|price_litecoin|     avg_diff|act_address|   avg_gas|trans_count|block_count| block_size|
+----------+------------+------------+--------+--------+--------------+-------------+--------------+-------------+-----------+----------+-----------+-----------+-----------+
|2017-11-09|  0.14880952|  0.12519841|     0.0|     1.0|       320.884|      7143.58|       64.2697|1.41999994E15|   243554.0|0.22217143|   467172.0|     6195.0|8.6289176E7|
|2017-11-10|         0.0|         0.0|     0.0|     0.0|       299.253|      6618.14|       59.2601|1.42999997E15|   233966.0|0.19108772|   457427.0|     6300.0| 8.450068E7|
|2017-11-11|        0.05|  0.27291667|     0.0|     1.0|       314.681|       6357.6|       62.3033|1.43999999E15|   194696.0|0.18

# **Transform Values in a Column of a DataFrame**

In [None]:
my_window = Window.partitionBy().orderBy("Time")
df1 = df1.withColumn("return_etherium",round((lag(df1.price_etherium).over(my_window)-df1.price_etherium)/df1.price_etherium,4))
df1 = df1.withColumn("return_bitcoin",round((lag(df1.price_bitcoin).over(my_window)-df1.price_bitcoin)/df1.price_bitcoin,4))
df1 = df1.withColumn("return_litecoin",round((lag(df1.price_litecoin).over(my_window)-df1.price_litecoin)/df1.price_litecoin,4))

df1 = df1.filter(df1.return_etherium.isNotNull())
df1.show()


+----------+------------+------------+--------+--------+--------------+-------------+--------------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|      Time|    polarity|subjectivity|Negative|Positive|price_etherium|price_bitcoin|price_litecoin|     avg_diff|act_address|   avg_gas|trans_count|block_count|  block_size|return_etherium|return_bitcoin|return_litecoin|
+----------+------------+------------+--------+--------+--------------+-------------+--------------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|2017-11-10|         0.0|         0.0|     0.0|     0.0|       299.253|      6618.14|       59.2601|1.42999997E15|   233966.0|0.19108772|   457427.0|     6300.0|  8.450068E7|         0.0723|        0.0794|         0.0845|
|2017-11-11|        0.05|  0.27291667|     0.0|     1.0|       314.681|       6357.6|       62.3033|1.43999999E1

# **Select Columns from a DataFrame**

In [None]:
df_select = df1.select("return_etherium",'return_bitcoin','return_litecoin')
df_select.show(10)

+---------------+--------------+---------------+
|return_etherium|return_bitcoin|return_litecoin|
+---------------+--------------+---------------+
|         0.0723|        0.0794|         0.0845|
|         -0.049|         0.041|        -0.0488|
|          0.022|        0.0685|         0.0559|
|        -0.0278|       -0.0929|        -0.0389|
|        -0.0619|       -0.0115|        -0.0205|
|         0.0128|       -0.0929|        -0.0179|
|         0.0074|       -0.0707|        -0.1018|
|        -0.0044|        0.0211|         0.0517|
|        -0.0438|       -0.0104|        -0.0298|
|        -0.0191|       -0.0307|        -0.0272|
+---------------+--------------+---------------+
only showing top 10 rows



***or***

In [None]:
df1.createOrReplaceTempView("ETH_Table")

str_SQL='''
SELECT 
return_etherium Return_ETH,
return_bitcoin Return_BTC,
return_litecoin Return_LTC
FROM ETH_Table
'''
spark.sql(str_SQL).show(10)

+----------+----------+----------+
|Return_ETH|Return_BTC|Return_LTC|
+----------+----------+----------+
|    0.0723|    0.0794|    0.0845|
|    -0.049|     0.041|   -0.0488|
|     0.022|    0.0685|    0.0559|
|   -0.0278|   -0.0929|   -0.0389|
|   -0.0619|   -0.0115|   -0.0205|
|    0.0128|   -0.0929|   -0.0179|
|    0.0074|   -0.0707|   -0.1018|
|   -0.0044|    0.0211|    0.0517|
|   -0.0438|   -0.0104|   -0.0298|
|   -0.0191|   -0.0307|   -0.0272|
+----------+----------+----------+
only showing top 10 rows



# **Filter Rows from a DataFrame Problem**

In [None]:
df_filter = df1.filter((df1.Positive == 1.0) & (df1.return_etherium >=0))
df_filter.show()

+----------+------------+------------+--------+--------+--------------+-------------+--------------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|      Time|    polarity|subjectivity|Negative|Positive|price_etherium|price_bitcoin|price_litecoin|     avg_diff|act_address|   avg_gas|trans_count|block_count|  block_size|return_etherium|return_bitcoin|return_litecoin|
+----------+------------+------------+--------+--------+--------------+-------------+--------------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|2017-11-15|  0.21666667|       0.775|     0.0|     1.0|       333.357|      7315.54|       63.8236|1.41999994E15|   258219.0| 0.2525046|   540593.0|     6237.0| 1.0261792E8|         0.0128|       -0.0929|        -0.0179|
|2017-11-16|  0.12901786|   0.4547619|     0.0|     1.0|       330.924|      7871.69|       71.0602|1.40000002E1

In [None]:
str_SQL='''
SELECT *
FROM ETH_Table
WHERE Positive = 1.0
AND (return_etherium > 0) OR (return_etherium = 0)

'''
spark.sql(str_SQL).show()

+----------+------------+------------+--------+--------+--------------+-------------+--------------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|      Time|    polarity|subjectivity|Negative|Positive|price_etherium|price_bitcoin|price_litecoin|     avg_diff|act_address|   avg_gas|trans_count|block_count|  block_size|return_etherium|return_bitcoin|return_litecoin|
+----------+------------+------------+--------+--------+--------------+-------------+--------------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|2017-11-15|  0.21666667|       0.775|     0.0|     1.0|       333.357|      7315.54|       63.8236|1.41999994E15|   258219.0| 0.2525046|   540593.0|     6237.0| 1.0261792E8|         0.0128|       -0.0929|        -0.0179|
|2017-11-16|  0.12901786|   0.4547619|     0.0|     1.0|       330.924|      7871.69|       71.0602|1.40000002E1

# **Delete a Column from an Existing DataFrame**

In [None]:
df_drop = df1.drop('polarity','price_etherium','price_bitcoin','price_litecoin')
df_drop.show()

+----------+------------+--------+--------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|      Time|subjectivity|Negative|Positive|     avg_diff|act_address|   avg_gas|trans_count|block_count|  block_size|return_etherium|return_bitcoin|return_litecoin|
+----------+------------+--------+--------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|2017-11-10|         0.0|     0.0|     0.0|1.42999997E15|   233966.0|0.19108772|   457427.0|     6300.0|  8.450068E7|         0.0723|        0.0794|         0.0845|
|2017-11-11|  0.27291667|     0.0|     1.0|1.43999999E15|   194696.0|0.18674426|   394378.0|     6267.0| 7.9370304E7|         -0.049|         0.041|        -0.0488|
|2017-11-12|         0.0|     0.0|     0.0|1.48999999E15|   213315.0|0.21059239|   434380.0|     6245.0| 8.2639584E7|          0.022|        0.0685|         0.0559|
|2017-11-1

# **Create and Use a PySpark SQL UDF**

In [None]:
def labeltrend(returns) :
  if returns > float(0.0):
    return "Increasing"
  else:
    return "Decreasing"

# **Data Labeling Problem**


In [None]:
labeltrendUdf = udf(labeltrend)

df_labeling = df_drop.withColumn("trend", labeltrendUdf(df_drop.return_etherium))
df_labeling.show()


+----------+------------+--------+--------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+----------+
|      Time|subjectivity|Negative|Positive|     avg_diff|act_address|   avg_gas|trans_count|block_count|  block_size|return_etherium|return_bitcoin|return_litecoin|     trend|
+----------+------------+--------+--------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+----------+
|2017-11-10|         0.0|     0.0|     0.0|1.42999997E15|   233966.0|0.19108772|   457427.0|     6300.0|  8.450068E7|         0.0723|        0.0794|         0.0845|Increasing|
|2017-11-11|  0.27291667|     0.0|     1.0|1.43999999E15|   194696.0|0.18674426|   394378.0|     6267.0| 7.9370304E7|         -0.049|         0.041|        -0.0488|Decreasing|
|2017-11-12|         0.0|     0.0|     0.0|1.48999999E15|   213315.0|0.21059239|   434380.0|     6245.0| 8.2639584E7|   

# **Perform Descriptive Statistics on a Column of a DataFrame**


In [None]:
perform_descriptive_statistics = df_drop.agg(sum("return_etherium"), avg("return_etherium"),stddev_samp("return_etherium"))
perform_descriptive_statistics.show()

+--------------------+--------------------+----------------------------+
|sum(return_etherium)|avg(return_etherium)|stddev_samp(return_etherium)|
+--------------------+--------------------+----------------------------+
|   0.538700000000001|3.241275571600487E-4|        0.053592854947612775|
+--------------------+--------------------+----------------------------+



# **Calculate Covariance Problem**

In [None]:
df_drop.cov('return_etherium','Positive')

-0.00018753230297089478

# **Calculate Correlation Problem**


In [None]:
df_drop.corr('return_etherium','Positive')

-0.0077330560007848395

# **Describe a DataFrame Problem**

In [None]:
dataDescription = df_drop.describe()
dataDescription.show()

+-------+-------------------+------------------+-------------------+--------------------+------------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+--------------------+
|summary|       subjectivity|          Negative|           Positive|            avg_diff|       act_address|           avg_gas|      trans_count|       block_count|         block_size|     return_etherium|      return_bitcoin|     return_litecoin|
+-------+-------------------+------------------+-------------------+--------------------+------------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+--------------------+
|  count|               1662|              1662|               1662|                1662|              1662|              1662|             1662|              1662|               1662|                1662|                1662|                1662|
|   mean

# **Sort Data in a DataFrame Problem**

In [None]:
df_sort_dataframe = df_drop.orderBy("return_etherium")
df_sort_dataframe.show(6)


+----------+------------+--------+--------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|      Time|subjectivity|Negative|Positive|     avg_diff|act_address|   avg_gas|trans_count|block_count|  block_size|return_etherium|return_bitcoin|return_litecoin|
+----------+------------+--------+--------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|2017-12-12|       0.075|     1.0|     0.0|1.56000004E15|   422384.0| 1.2222886|   872340.0|     5715.0| 1.4634464E8|        -0.2092|       -0.0275|        -0.3225|
|2021-01-03|        0.25|     0.0|     1.0|3.86999988E15|   584367.0| 10.438705|  1234419.0|     6476.0|2.79739296E8|         -0.206|         -0.02|        -0.1451|
|2021-05-24|       0.308|     0.0|     1.0| 7.7500001E15|   696056.0| 12.714748|  1312739.0|     6487.0|3.60541088E8|         -0.202|       -0.1017|          -0.22|
|2020-03-1

# **Sort Data Partition-Wise Problem**

In [None]:
df_sorted_Partitons = df_drop.sortWithinPartitions("Positive","return_etherium", ascending=[False,True])
df_sorted_Partitons.show()


+----------+------------+--------+--------+-------------+-----------+-----------+-----------+-----------+------------+---------------+--------------+---------------+
|      Time|subjectivity|Negative|Positive|     avg_diff|act_address|    avg_gas|trans_count|block_count|  block_size|return_etherium|return_bitcoin|return_litecoin|
+----------+------------+--------+--------+-------------+-----------+-----------+-----------+-----------+------------+---------------+--------------+---------------+
|2021-01-03|        0.25|     0.0|     1.0|3.86999988E15|   584367.0|  10.438705|  1234419.0|     6476.0|2.79739296E8|         -0.206|         -0.02|        -0.1451|
|2021-05-24|       0.308|     0.0|     1.0| 7.7500001E15|   696056.0|  12.714748|  1312739.0|     6487.0|3.60541088E8|         -0.202|       -0.1017|          -0.22|
|2020-03-19|       0.108|     0.0|     1.0|2.19000007E15|   465615.0| 0.14119631|   864017.0|     6482.0| 1.7769096E8|        -0.1592|       -0.1539|        -0.1379|
|202

# **Remove Duplicate Records from a DataFrame**

In [None]:
df_noduplicate = df_drop.drop_duplicates()
df_noduplicate.show()

if df_drop.count() == df_noduplicate.count():
  print('No duplicate case')
else:
  print('Exist duplicate case')

+----------+------------+--------+--------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|      Time|subjectivity|Negative|Positive|     avg_diff|act_address|   avg_gas|trans_count|block_count|  block_size|return_etherium|return_bitcoin|return_litecoin|
+----------+------------+--------+--------+-------------+-----------+----------+-----------+-----------+------------+---------------+--------------+---------------+
|2017-11-10|         0.0|     0.0|     0.0|1.42999997E15|   233966.0|0.19108772|   457427.0|     6300.0|  8.450068E7|         0.0723|        0.0794|         0.0845|
|2017-11-11|  0.27291667|     0.0|     1.0|1.43999999E15|   194696.0|0.18674426|   394378.0|     6267.0| 7.9370304E7|         -0.049|         0.041|        -0.0488|
|2017-11-12|         0.0|     0.0|     0.0|1.48999999E15|   213315.0|0.21059239|   434380.0|     6245.0| 8.2639584E7|          0.022|        0.0685|         0.0559|
|2017-11-1