Analisi del consenso sul Bitcoin

Sei stato reclutato da un'azienda di ricerche di mercato per stimare il consenso delle persone verso il Bitcoin. Un team di data engineer ha estratto diversi milioni di tweet che parlano di Bitcoin, il tuo compito è quello di eseguire un'analisi del sentiment e creare un grafico che mostri come questo è variato giorno per giorno. Utilizza anche i dati per rispondere a domande interessanti, come:

i tweet negativi hanno avuto più likes rispetto a quelli positivi?

i tweet negativi hanno avuto più interazioni (risposte) rispetto a quelli positivi?

BONUS
Verifica se la variazione del sentiment è associata ad una variazione del valore del Bitcoin, per svolgere questa task devi reperire in autonomia lo storico di BTC USD

In [0]:
#Installazione e importazione delle librerie
%pip install langdetect TextBlob nltk
import nltk
nltk.download("popular", quiet=True)
nltk.download('stopwords', quiet=True)

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
Collecting langdetect
  Downloading langdetect-1.0.9.tar.gz (981 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/981.5 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.4/981.5 kB[0m [31m1.7 MB/s[0m eta [36m0:00:01[0m
[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━[0m [32m706.6/981.5 kB[0m [31m9.5 MB/s[0m eta [36m0:00:01[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting TextBlob
  Obtaining dependency information for TextBlob from https://files.pythonhosted.org/packages/02/07/5fd2945356dd839974d3a25de8a142dc37293c21315729a41e775b5f35

True

In [0]:

#Importazione del dataset
!wget https://proai-datasets.s3.eu-west-3.amazonaws.com/bitcoin_tweets.csv

import pandas as pd

dataset = pd.read_csv('/databricks/driver/bitcoin_tweets.csv', delimiter=",")
dataset = dataset[["timestamp","replies","likes","text"]] #selezione delle colonne interessate
dataset.dropna(subset = ["text"],inplace = True) 
dataset["timestamp"] = pd.to_datetime(dataset["timestamp"], format="%Y-%m-%d %H:%M:%S%z").dt.date
dataset = dataset.rename(columns={"timestamp": "date"}) #conversione in datetime

spark_df = spark.createDataFrame(dataset)  #conversione a spark

--2024-06-22 22:20:22--  https://proai-datasets.s3.eu-west-3.amazonaws.com/bitcoin_tweets.csv
Resolving proai-datasets.s3.eu-west-3.amazonaws.com (proai-datasets.s3.eu-west-3.amazonaws.com)... 3.5.224.103, 16.12.20.42
Connecting to proai-datasets.s3.eu-west-3.amazonaws.com (proai-datasets.s3.eu-west-3.amazonaws.com)|3.5.224.103|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 24708828 (24M) [text/csv]
Saving to: ‘bitcoin_tweets.csv’


2024-06-22 22:20:26 (7.68 MB/s) - ‘bitcoin_tweets.csv’ saved [24708828/24708828]



In [0]:

# Rilevamento della lingua nel testo
def detect_language(text):
    try:
        return detect(text)
    except LangDetectException:
        return "unknown"
    
# Pulizia testo e rimozione delle stopwords
from textblob import TextBlob
from nltk.corpus import stopwords
stopwords = set(stopwords.words('english'))

def clean_stopwords(text):
    tokens = TextBlob(text).words
    cleaned_words = [word for word in tokens if word.lower() not in stopwords]
    cleaned_text = ' '.join(cleaned_words)
    return cleaned_text

In [0]:

#Importazione delle funzioni e dell'udf
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType
from langdetect import detect, LangDetectException

#definizione dell'user defined function per il rilevamento della lingua.
clean_stopwords_udf = udf(clean_stopwords, StringType())
detect_language_udf = udf(detect_language, StringType())

#applicazione alle colonne del df
spark_df = spark_df.withColumn("clean_text", clean_stopwords_udf(spark_df["text"]))
spark_df = spark_df.withColumn("language", detect_language_udf(spark_df["clean_text"]))

#filtro per l'inglese
spark_df = spark_df.filter(spark_df["language"] == "en")

#rimozione colonne temporanee
spark_df = spark_df.drop("language", "clean_text")

In [0]:
#analisi del sentiment
def get_sentiment(text):
    return TextBlob(text).sentiment.polarity

#uso dell'udf per l'analisi del sentiment
udf_clean_stopwords = udf(clean_stopwords, StringType())
udf_sentiment = udf(get_sentiment, FloatType())

spark_df = spark_df.withColumn("cleaned_text", udf_clean_stopwords(spark_df["text"]))
spark_df = spark_df.withColumn("sentiment", udf_sentiment(spark_df["cleaned_text"]))

#rimozione temporanea delle colonne
spark_df = spark_df.drop("text","cleaned_text")

#creazione
spark_df.createOrReplaceTempView("bitcoin_tweets_sentiment")

In [0]:

#importazione
from pyspark.sql import functions as F

#limite del df per eccessivi dati
df = spark.sql("""select date, sentiment from bitcoin_tweets_sentiment""").limit(100000)

#aggregazione del sentiment
daily_sentiment = df.groupBy("date").agg(F.avg("sentiment").alias("avg_sentiment"))

#classificazione del sentimenti
def sentiment_label(avg_sentiment):
    if avg_sentiment > 0.1:
        return "Positive"
    elif avg_sentiment < -0.1:
        return "Negative"
    else:
        return "Neutral"
    
udf_label = udf(sentiment_label, StringType())
daily_sentiment = daily_sentiment.withColumn("sentiment_label", udf_label("avg_sentiment"))
daily_sentiment = daily_sentiment.drop("sentiment") 

daily_sentiment.createOrReplaceTempView("daily_sentiment_tempview")
display(daily_sentiment) 


date,avg_sentiment,sentiment_label
2019-05-27,0.096307688722484,Neutral
2019-05-21,0.1716666668653488,Positive
2019-05-22,0.1386904716491699,Positive
2019-05-26,0.164063972171288,Positive
2009-01-11,0.0,Neutral
2019-05-02,0.1565728733936945,Positive
2019-05-25,0.1613984225051743,Positive
2019-03-12,0.1722796113992279,Positive
2019-05-23,0.0333333325882752,Neutral
2019-05-24,0.1377976228083883,Positive


In [0]:
#impostazione del limite sul df
df = spark.sql(""" select sentiment, likes, replies from bitcoin_tweets_sentiment""").limit(100000)

#filtro sentiment
filtered_df = df.filter(df.sentiment.isNotNull())

# Definisce e applica la UDF per classificare il sentiment
def sentiment_label(sentiment):
    if sentiment > 0:
        return "positive"
    elif sentiment < 0:
        return "negative"
    else:
        return "neutral"

udf_label = udf(sentiment_label, StringType())
df = df.withColumn("sentiment_label", udf_label("sentiment"))

# Filtra per sentiment positivo e negativo
df = df.filter(df["sentiment_label"].isin("positive", "negative"))

# Raggruppa per sentiment e calcola la somma di likes e replies
pivot_df = df.groupBy("sentiment_label").agg(
    F.sum("likes").alias("total_likes"),
    F.sum("replies").alias("total_replies")
)

# Visualizza i risultati
display(pivot_df)

sentiment_label,total_likes,total_replies
positive,296055.0,41821.0
negative,95606.0,10890.0


Dalla tabella esposta notiamo che i commenti rilevati come positivi hanno avuto maggiori like ed interazioni rispetto a quelli negativi

In [0]:

#importazione del df con i dati storici del bitcoin
!wget https://raw.githubusercontent.com/Alex-Gnn2813/Progetti/main/BTC_USD%20Bitfinex%20Dati%20Storici.csv

#conversione df in spark e processamento
var_bitcoin = pd.read_csv('/databricks/driver/BTC_USD Bitfinex Dati Storici.csv', delimiter=",", header=0)
var_bitcoin = var_bitcoin[["Data","Var. %"]]
var_bitcoin["Var. %"] = var_bitcoin["Var. %"].str.replace('%', '').str.replace(',', '.').astype(float)
var_bitcoin['Data'] = pd.to_datetime(var_bitcoin['Data'], format='%d.%m.%Y').dt.date

spark_df = spark.createDataFrame(var_bitcoin)
spark_df.createTempView("bitcoin_variation")

--2024-06-22 22:42:50--  https://raw.githubusercontent.com/Alex-Gnn2813/Progetti/main/BTC_USD%20Bitfinex%20Dati%20Storici.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 167471 (164K) [text/plain]
Saving to: ‘BTC_USD Bitfinex Dati Storici.csv’


2024-06-22 22:42:51 (4.69 MB/s) - ‘BTC_USD Bitfinex Dati Storici.csv’ saved [167471/167471]



In [0]:

#unione e filtraggio dei df per vedere una possibile correlazione
joined_df = spark.sql("""
select d.date, d.sentiment_label, s.`Var. %`as `Var_%`
from daily_sentiment_tempview d 
left join bitcoin_variation s on d.date = s.Data
""")
joined_df = joined_df.dropna(subset=["Var_%"])

display(joined_df)

date,sentiment_label,Var_%
2019-05-27,Neutral,0.87
2019-05-21,Positive,-0.6
2019-05-22,Positive,-3.98
2019-05-26,Positive,8.38
2019-05-02,Positive,1.67
2019-05-25,Positive,0.75
2019-03-12,Positive,0.13
2019-05-23,Neutral,3.3
2019-05-24,Positive,1.56
2019-05-10,Neutral,2.88


In [0]:

#importazione e filtraggio
from pyspark.sql.functions import when

joined_df = joined_df.filter(joined_df["sentiment_label"] != "neutral")
joined_df = joined_df.withColumn("sentiment_binary", when(joined_df["sentiment_label"] == "positive", 1).otherwise(0))

joined_df_pandas = joined_df.select("sentiment_binary", "Var_%").toPandas()

In [0]:

# Verifica se i dati sono costanti
sentiment_unique_count = joined_df_pandas["sentiment_binary"].nunique()
var_unique_count = joined_df_pandas["Var_%"].nunique()

print(f"Valori unici in 'sentiment_binary': {sentiment_unique_count}")
print(f"Valori unici in 'Var_%': {var_unique_count}")

# Se uno dei seguenti conteggi è 1, significa che i dati sono costanti
if sentiment_unique_count <= 1 or var_unique_count <= 1:
    print("I dati sono costanti, non è possibile calcolare la correlazione.")
else:
    from scipy.stats import pointbiserialr

    # Calcola la correlazione point-biserial
    correlation, p_value = pointbiserialr(joined_df_pandas["sentiment_binary"], joined_df_pandas["Var_%"])
    print(f"Correlazione Point-Biserial: {correlation}")
    print(f"P-value: {p_value}")

Valori unici in 'sentiment_binary': 1
Valori unici in 'Var_%': 409
I dati sono costanti, non è possibile calcolare la correlazione.


non è presente alcuna correlazione che associ la variazione del sentiment ad una variazione del valore del Bitcoin.