In [392]:
from pymongo import MongoClient
import pandas as pd
import numpy as np 
import seaborn as sns
%matplotlib inline 
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
import pyspark.sql.types as T
import plotly as plt
import plotly.graph_objects as go
import pandas as pd
pd.set_option('display.float_format', lambda x: '%.4f' % x)

client = MongoClient('mongo',27017)

# Chargement des données mongo

In [364]:
db = client.database

postCryptoPanic = db.posts
postXRP = db.coursXRP
postBTC = db.coursBTC
postETH = db.coursETH

# Création de la session spark et connexion à mongo

In [365]:
from pyspark.sql import SparkSession

db = SparkSession.\
builder.\
appName("myApp").\
config("spark.mongodb.input.uri", "mongodb://mongo:27017/database.*").\
config("spark.mongodb.output.uri", "mongodb://mongo:27017/database.*").\
config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
getOrCreate()


In [366]:
dfXRP = db.read.format("mongo").option("uri","mongodb://mongo:27017/database.coursXRP").load()
dfBTC = db.read.format("mongo").option("uri","mongodb://mongo:27017/database.coursBTC").load()
dfETH = db.read.format("mongo").option("uri","mongodb://mongo:27017/database.coursETH").load()
dfPOST = db.read.format("mongo").option("uri","mongodb://mongo:27017/database.posts").load()

# Transformation des dataframes

## Timestamp, calcul de variation et data cleaning

In [380]:
new_dfXRP = dfXRP.withColumn('date',F.from_unixtime((F.col('Timestamp')/1000))) 
new_dfXRP = new_dfXRP.withColumn('variation',F.round(((new_dfXRP['close']- new_dfXRP['open'])/ new_dfXRP['open']*100),3))

new_dfBTC = dfBTC.withColumn('date',F.from_unixtime((F.col('Timestamp')/1000))) 
new_dfBTC = new_dfBTC.withColumn('variation',F.round(((new_dfBTC['close']- new_dfBTC['open'])/ new_dfBTC['open']*100),3))

new_dfETH = dfETH.withColumn('date',F.from_unixtime((F.col('Timestamp')/1000))) 
new_dfETH = new_dfETH.withColumn('variation',F.round(((new_dfETH['close']- new_dfETH['open'])/ new_dfETH['open']*100),3))

new_dfPOST = dfPOST.withColumn('date',F.from_unixtime((F.col('published_at')))) 

In [381]:
new_dfXRP = new_dfXRP["date","open","high","low","close","volume", "variation"]
new_dfBTC = new_dfBTC["date","open","high","low","close","volume", "variation"]
new_dfETH = new_dfETH["date","open","high","low","close","volume", "variation"]
new_dfPOST = new_dfPOST["date", "currencies", "title","votes", "url"]

In [369]:
new_dfXRP.printSchema()

root
 |-- date: string (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- variation: double (nullable = true)



In [370]:
new_dfPOST.show()

+-------------------+--------------------+--------------------+-----+--------------------+
|               date|          currencies|               title|votes|                 url|
+-------------------+--------------------+--------------------+-----+--------------------+
|2021-11-25 11:15:56|[{BTC, Bitcoin, b...|New German Chance...| null|https://cryptopan...|
|2021-11-25 11:08:31|[{BTC, Bitcoin, b...|Crypto market cap...| good|https://cryptopan...|
|2021-11-25 10:23:43|[{BTC, Bitcoin, b...|Bitcoin ‘solidifi...| good|https://cryptopan...|
|2021-11-25 10:10:57|[{BTC, Bitcoin, b...|Bitcoin Puts El S...| good|https://cryptopan...|
|2021-11-25 10:03:27|[{BTC, Bitcoin, b...|Number of Bitcoin...| good|https://cryptopan...|
|2021-11-25 09:15:20|[{BTC, Bitcoin, b...|Crypto poses no b...| good|https://cryptopan...|
|2021-11-25 08:04:54|[{BTC, Bitcoin, b...|Ethereum Poised T...| good|https://cryptopan...|
|2021-11-25 07:55:06|[{BTC, Bitcoin, b...|Bitcoin Processed...| good|https://cryptopan...|

### Exemples sur XRP

In [371]:
new_dfXRP.show()

+-------------------+------+------+------+------+---------+---------+
|               date|  open|  high|   low| close|   volume|variation|
+-------------------+------+------+------+------+---------+---------+
|2021-11-22 04:25:00|1.0394|1.0408|1.0372|1.0402| 834920.0|     0.08|
|2021-11-22 04:30:00|1.0402|1.0402|1.0359|1.0373| 600364.0|    -0.28|
|2021-11-22 04:35:00|1.0373|1.0395|1.0356| 1.039| 604846.0|     0.16|
|2021-11-22 04:40:00| 1.039|  1.04| 1.036|1.0362| 788089.0|    -0.27|
|2021-11-22 04:45:00|1.0362|1.0388|1.0357|1.0388| 675312.0|     0.25|
|2021-11-22 04:50:00|1.0388|1.0392|1.0327|1.0339| 914497.0|    -0.47|
|2021-11-22 04:55:00|1.0336|1.0361| 1.032|1.0338|1050190.0|     0.02|
|2021-11-22 05:00:00|1.0337|1.0351|1.0292|1.0306|1464764.0|     -0.3|
|2021-11-22 05:05:00|1.0305|1.0319|1.0262|1.0282|1629401.0|    -0.22|
|2021-11-22 05:10:00|1.0282|1.0312|1.0251| 1.031|1359418.0|     0.27|
|2021-11-22 05:15:00| 1.031|1.0329|  1.03|1.0329| 780156.0|     0.18|
|2021-11-22 05:20:00

# Exemples de filtres

In [372]:
new_dfXRP.where(dfXRP['volume']>1000000.0).show()

+-------------------+------+------+------+------+---------+---------+
|               date|  open|  high|   low| close|   volume|variation|
+-------------------+------+------+------+------+---------+---------+
|2021-11-22 04:55:00|1.0336|1.0361| 1.032|1.0338|1050190.0|     0.02|
|2021-11-22 05:00:00|1.0337|1.0351|1.0292|1.0306|1464764.0|     -0.3|
|2021-11-22 05:05:00|1.0305|1.0319|1.0262|1.0282|1629401.0|    -0.22|
|2021-11-22 05:10:00|1.0282|1.0312|1.0251| 1.031|1359418.0|     0.27|
|2021-11-22 05:20:00|1.0329| 1.039|1.0318|1.0372|2409701.0|     0.42|
|2021-11-22 05:25:00|1.0372|1.0382|1.0337|1.0343|1235675.0|    -0.28|
|2021-11-22 05:30:00|1.0345|1.0378|1.0342|1.0367|1137503.0|     0.21|
|2021-11-22 05:45:00|1.0372|1.0413|1.0364|1.0408|1006292.0|     0.35|
|2021-11-22 05:50:00|1.0409|1.0418|1.0392|1.0414|1128688.0|     0.05|
|2021-11-22 06:55:00|1.0409|1.0427|1.0405|1.0414|1051802.0|     0.05|
|2021-11-22 07:00:00|1.0414|1.0491|1.0411|1.0487|2217928.0|      0.7|
|2021-11-22 07:05:00

In [373]:
new_dfXRP.summary("min","max").show()

+-------+-------------------+------+------+------+------+---------+---------+
|summary|               date|  open|  high|   low| close|   volume|variation|
+-------+-------------------+------+------+------+------+---------+---------+
|    min|2021-11-22 04:25:00|1.0104|1.0139|1.0068|1.0103|  20345.0|    -0.98|
|    max|2021-11-25 15:45:00|1.0716|1.0735|  1.07|1.0716|9559760.0|     1.16|
+-------+-------------------+------+------+------+------+---------+---------+



In [374]:
new_dfXRP.where(new_dfXRP['date'].startswith("2021-11-22")).show()

+-------------------+------+------+------+------+---------+---------+
|               date|  open|  high|   low| close|   volume|variation|
+-------------------+------+------+------+------+---------+---------+
|2021-11-22 04:25:00|1.0394|1.0408|1.0372|1.0402| 834920.0|     0.08|
|2021-11-22 04:30:00|1.0402|1.0402|1.0359|1.0373| 600364.0|    -0.28|
|2021-11-22 04:35:00|1.0373|1.0395|1.0356| 1.039| 604846.0|     0.16|
|2021-11-22 04:40:00| 1.039|  1.04| 1.036|1.0362| 788089.0|    -0.27|
|2021-11-22 04:45:00|1.0362|1.0388|1.0357|1.0388| 675312.0|     0.25|
|2021-11-22 04:50:00|1.0388|1.0392|1.0327|1.0339| 914497.0|    -0.47|
|2021-11-22 04:55:00|1.0336|1.0361| 1.032|1.0338|1050190.0|     0.02|
|2021-11-22 05:00:00|1.0337|1.0351|1.0292|1.0306|1464764.0|     -0.3|
|2021-11-22 05:05:00|1.0305|1.0319|1.0262|1.0282|1629401.0|    -0.22|
|2021-11-22 05:10:00|1.0282|1.0312|1.0251| 1.031|1359418.0|     0.27|
|2021-11-22 05:15:00| 1.031|1.0329|  1.03|1.0329| 780156.0|     0.18|
|2021-11-22 05:20:00

#### Statistique déscriptive des cloture et volume sur une période définie

In [375]:
XRP_filtered_period = new_dfXRP.filter(F.col("date").between('2021-11-22 08:00:00', '2021-11-22 09:00:00'))
XRP_filtered_period.describe('close','volume').show()

+-------+--------------------+------------------+
|summary|               close|            volume|
+-------+--------------------+------------------+
|  count|                  26|                26|
|   mean|  1.0447846153846156| 672391.3846153846|
| stddev|0.001112723616246...|309335.31857802166|
|    min|               1.043|          405122.0|
|    max|              1.0466|         1566116.0|
+-------+--------------------+------------------+



# DATAVIZ 

In [382]:
dfPOSTpanda = new_dfPOST.toPandas()
dfBTCpanda = new_dfBTC.toPandas()
dfXRPpanda = new_dfXRP.toPandas()
dfETHpanda = new_dfETH.toPandas()

In [358]:
dfPOSTpanda

Unnamed: 0,date,currencies,title,votes,url
0,2021-11-25 11:15:56,"[(BTC, Bitcoin, bitcoin, https://cryptopanic.c...",New German Chancellor Olaf Scholz Made A Serio...,,https://cryptopanic.com/news/13461232/New-Germ...
1,2021-11-25 11:08:31,"[(BTC, Bitcoin, bitcoin, https://cryptopanic.c...",Crypto market cap rebounds as Metaverse tokens...,good,https://cryptopanic.com/news/13461209/Crypto-m...
2,2021-11-25 10:23:43,"[(BTC, Bitcoin, bitcoin, https://cryptopanic.c...",Bitcoin ‘solidifies’ support at $58K as BTC pr...,good,https://cryptopanic.com/news/13461087/Bitcoin-...
3,2021-11-25 10:10:57,"[(BTC, Bitcoin, bitcoin, https://cryptopanic.c...",Bitcoin Puts El Salvador On The Map As Tourism...,good,https://cryptopanic.com/news/13461066/Bitcoin-...
4,2021-11-25 10:03:27,"[(BTC, Bitcoin, bitcoin, https://cryptopanic.c...",Number of Bitcoin wallets with non-zero balanc...,good,https://cryptopanic.com/news/13461039/Number-o...
5,2021-11-25 09:15:20,"[(BTC, Bitcoin, bitcoin, https://cryptopanic.c...","Crypto poses no big risk to economy so far, Ba...",good,https://cryptopanic.com/news/13460875/Crypto-p...
6,2021-11-25 08:04:54,"[(BTC, Bitcoin, bitcoin, https://cryptopanic.c...",Ethereum Poised To Dramatically Outperform Bit...,good,https://cryptopanic.com/news/13458994/Ethereum...
7,2021-11-25 07:55:06,"[(BTC, Bitcoin, bitcoin, https://cryptopanic.c...",Bitcoin Processed 62% More Transactions Than P...,good,https://cryptopanic.com/news/13458759/Bitcoin-...
8,2021-11-25 01:20:00,"[(ETH, Ethereum, ethereum, https://cryptopanic...",Crypto.com Coin: Crypto Analytics Startup Issu...,good,https://cryptopanic.com/news/13451945/Cryptoco...
9,2021-11-24 22:20:41,"[(BTC, Bitcoin, bitcoin, https://cryptopanic.c...",Payments Giant Stripe Confirms It May Restart ...,good,https://cryptopanic.com/news/13451486/Payments...


## Filtre sur post Crypto Panic

## Récupération du cours de la crypto avant et après la parution d'un post

In [383]:
from pyspark.sql.functions import from_utc_timestamp

#obtention de la dataframe et du cours de la crypto corréspodant au post CryptoPanic à partir de sa création 
#(après et avant)

for row in dfPOSTpanda.itertuples():
    currencies = row[2]
    titre = row[3]
    dataOfPublication = row[1]
    for currencie in currencies:
        if currencie["code"] == "XRP":
            print(dataOfPublication)
            print(titre)
            afterPublicationXRP = new_dfXRP.where(new_dfXRP['date']>row[1])
            beforePublicationXRP = new_dfXRP.where(new_dfXRP['date']<row[1])            

2021-11-24 22:08:17
Ripple co-founder Jed McCaleb hasn’t sold XRP in 3 months, still holds $730M


## Impact du post du post sur le cours de la crypto

In [384]:
beforePublicationXRP.describe("volume","close").show()

+-------+------------------+-------------------+
|summary|            volume|              close|
+-------+------------------+-------------------+
|  count|               423|                423|
|   mean|1144451.1749408983|  1.037771631205674|
| stddev| 651087.7989710014|0.00912768036001847|
|    min|           20345.0|             1.0227|
|    max|         5684606.0|             1.0571|
+-------+------------------+-------------------+



In [325]:
afterPublicationXRP.describe("volume","close").show()

+-------+------------------+--------------------+
|summary|            volume|               close|
+-------+------------------+--------------------+
|  count|              1577|                1577|
|   mean|1145260.8560558022|  1.0422078630310727|
| stddev| 779425.5033600159|0.011288860196204576|
|    min|          247433.0|              1.0103|
|    max|         9559760.0|              1.0716|
+-------+------------------+--------------------+



## Visualisation sur Pandas et plotly

In [387]:
beforePublicationXRPpd = beforePublicationXRP.toPandas()
afterPublicationXRPpd = afterPublicationXRP.toPandas()

In [395]:
#realisation d'un graph interactif
figBefore = go.Figure(data=[go.Candlestick(x=beforePublicationXRPpd['date'],open=beforePublicationXRPpd['open'], high=beforePublicationXRPpd['high'], low=beforePublicationXRPpd['low'], close=beforePublicationXRPpd['close'])])

#fig.update_layout(xaxis_rangeslider_visible=False)
figBefore.show()

In [None]:
#realisation d'un graph interactif
figAfter = go.Figure(data=[go.Candlestick(x=afterPublicationXRPpd['date'],open=afterPublicationXRPpd['open'], high=afterPublicationXRPpd['high'], low=afterPublicationXRPpd['low'], close=afterPublicationXRPpd['close'])])

#fig.update_layout(xaxis_rangeslider_visible=False)
figAfter.show()