In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

In [2]:
import findspark

# 初始化 findspark 并指向 EMR 的 Spark 安装路径
findspark.init('/usr/lib/spark')

# 确认是否正确初始化
print("findspark initialized:", findspark.find())

import findspark
findspark.init("/usr/lib/spark")  # 替换为 Spark 的实际路径

findspark initialized: /usr/lib/spark


In [3]:
import os

# 配置环境变量
os.environ["SPARK_HOME"] = "/usr/lib/spark"
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf"

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("EMR CSV Export").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/23 17:34:37 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
24/11/23 17:34:46 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


In [5]:
# 读取数据
s3_file_path = "s3a://dcu-dmv-bucket/Bitcoin_tweets.csv"

# 读取 S3 数据
df = spark.read.csv(s3_file_path, header=True, inferSchema=True)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
24/11/23 17:34:47 WARN ConfigurationHelper: Option fs.s3a.connection.establish.timeout is too low (5,000 ms). Setting to 15,000 ms instead
                                                                                

In [6]:
#df.drop(['user_name','user_location','user_description','user_created','user_friends','user_favourites','user_verified','source','is_retweet'],axis=1,inplace=True)
# 假设原始 PySpark DataFrame 名为 df

columns_to_drop = ['user_name', 'user_location', 'user_description', 
                   'user_created', 'user_friends', 'user_favourites', 
                   'user_verified', 'source', 'is_retweet']

# 删除指定列
df = df.drop(*columns_to_drop)

In [7]:
from pyspark.sql.functions import lower, col
df = df.withColumn("hashtags", lower(col("hashtags")))  # 转换为小写

In [8]:
# 筛选包含特定关键字的行
df_filtered = df.filter(
    col("hashtags").rlike(r"\b(btc|bitcoin|bitcoins)\b")
)

In [9]:
# df.shape
def get_shape(spark_df):
    return (spark_df.count(), len(spark_df.columns))

shape = get_shape(df_filtered)
print(f"Shape: {shape}")



Shape: (1400452, 4)


                                                                                

In [10]:
df_filtered = df_filtered.drop('hashtags')

In [11]:
from pyspark.sql.functions import date_format, to_timestamp

# 将日期字符串转换为新的解析器兼容的格式
df_filtered = df_filtered.withColumn("date", to_timestamp("date", "yyyy-MM-dd HH:mm:ss"))

In [12]:
df_filtered = df_filtered.dropna()

In [13]:
from pyspark.sql.functions import col
# 定义开始日期和结束日期
start_date = '2022-01-14'
end_date = '2022-04-14'
# 按条件筛选行
df_filtered = df_filtered.filter((col("date") > start_date) & (col("date") <= end_date))

# 显示筛选后的结果
df_filtered.show()



+--------------+-------------------+--------------------+
|user_followers|               date|                text|
+--------------+-------------------+--------------------+
|         511.0|2022-01-14 23:59:59|Death Cross #bitc...|
|          26.0|2022-01-14 23:59:56|#bitcoin is revol...|
|          94.0|2022-01-14 23:59:56|Teaser! #Bitcoin ...|
|          71.0|2022-01-14 23:59:43|#Bitcoin $BTC Nic...|
|         152.0|2022-01-14 23:59:42|👋 Hey! Wait! hel...|
|           0.0|2022-01-14 23:59:20|#Bitcoin is a vol...|
|         206.0|2022-01-14 23:59:14|4 hour top movers...|
|           6.0|2022-01-14 23:58:41|Bullishnewfie fou...|
|           0.0|2022-01-14 23:58:10|u rn on way to bu...|
|         477.0|2022-01-14 23:57:50|Current #Bitcoin ...|
|           7.0|2022-01-14 23:57:46|7% inflation? Bal...|
|          50.0|2022-01-14 23:57:31|@DoombergT #bitco...|
|       76481.0|2022-01-14 23:57:07|#linkedin #twitte...|
|         230.0|2022-01-14 23:57:05|👋 A new block wa...|
|         178.0|

                                                                                

In [14]:
# 筛选 user_followers > 10 的行
df_filtered = df_filtered.filter(col("user_followers") > 10)

In [15]:
shape = get_shape(df_filtered)
print(f"Shape: {shape}")



Shape: (152763, 3)


                                                                                

In [16]:
import re

In [17]:
from pyspark.sql.functions import lower, regexp_replace, split, size
df_filtered = df_filtered.withColumn("text", lower(col("text")))  # 转换为小写
df_filtered = df_filtered.withColumn("text", regexp_replace(col("text"), "@[A-Za-z0-9_]+", ""))  # 删除用户名
df_filtered = df_filtered.withColumn("text", regexp_replace(col("text"), "#[A-Za-z0-9_]+", ""))  # 删除话题标签
df_filtered = df_filtered.withColumn("text", regexp_replace(col("text"), r"http\S+", ""))  # 删除 URL
df_filtered = df_filtered.withColumn("text", regexp_replace(col("text"), r"www.\S+", ""))  # 删除以 www. 开头的 URL
df_filtered = df_filtered.withColumn("text", regexp_replace(col("text"), r"[()!?]", " "))  # 替换标点符号
df_filtered = df_filtered.withColumn("text", regexp_replace(col("text"), r"\[.*?\]", " "))  # 删除方括号内容
df_filtered = df_filtered.withColumn("text", regexp_replace(col("text"), r"[^a-z0-9]", " "))  # 替换非字母数字字符为空格

In [18]:
# 筛选 text 列中单词数大于 3 的行
df2 = df_filtered.filter(size(split(col("text"), " ")) > 3)
df2 = df2.drop('user_followers')
# 显示结果
df2.show()



+-------------------+--------------------+
|               date|                text|
+-------------------+--------------------+
|2022-01-14 23:59:59| death cross  dump  |
|2022-01-14 23:59:56| is revolutionizi...|
|2022-01-14 23:59:56|         teaser     |
|2022-01-14 23:59:43|  btc nice bounce...|
|2022-01-14 23:59:42|  hey  wait  help...|
|2022-01-14 23:59:14|4 hour top movers...|
|2022-01-14 23:57:50|current  price is...|
|2022-01-14 23:57:31|      fixes this    |
|2022-01-14 23:57:07|                 ...|
|2022-01-14 23:57:05|  a new block was...|
|2022-01-14 23:56:31|traders say  run ...|
|2022-01-14 23:56:25|   bitcoin will r...|
|2022-01-14 23:56:05|bpod found  in a ...|
|2022-01-14 23:55:46|       is really ...|
|2022-01-14 23:55:33|o melveny  dla pi...|
|2022-01-14 23:55:31|this video will w...|
|2022-01-14 23:55:26|          fixed this|
|2022-01-14 23:55:17|this video will w...|
|2022-01-14 23:55:09|this video will w...|
|2022-01-14 23:54:52|about 70  of  in ...|
+----------

                                                                                

In [19]:
shape = get_shape(df2)
print(f"Shape: {shape}")



Shape: (152086, 2)


                                                                                

In [20]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import re

  from .autonotebook import tqdm as notebook_tqdm


In [21]:
# tokenizer = AutoTokenizer.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")
# model = AutoModelForSequenceClassification.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")

In [22]:
# def sentiment_score(text):
#     tokens = tokenizer.encode(text, return_tensors="pt")
#     result = model(tokens)
#     return int(torch.argmax(result.logits)) - 2

In [23]:
# text = 'bitcoin just ok'
# sentiment_score(text)

In [24]:
# pandas_df = df2.toPandas()

In [25]:
# pandas_df.shape

In [26]:
# print('hello')

In [27]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# 加载 BERT 模型和分词器
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

tokenizer = AutoTokenizer.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")
model = AutoModelForSequenceClassification.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")

# 定义情感分析函数
def sentiment_score(text):
    tokens = tokenizer.encode(text[:512], return_tensors="pt")
    result = model(tokens)
    return int(torch.argmax(result.logits)) - 2

# 将函数注册为 UDF
sentiment_udf = udf(sentiment_score, IntegerType())

# 创建新的 Spark DataFrame 列，添加 sentiment 列
df2 = df2.withColumn("sentiment", sentiment_udf(df2["text"]))

# 显示结果
df2.show()




+-------------------+--------------------+---------+
|               date|                text|sentiment|
+-------------------+--------------------+---------+
|2022-01-14 23:59:59| death cross  dump  |       -2|
|2022-01-14 23:59:56| is revolutionizi...|        2|
|2022-01-14 23:59:56|         teaser     |        0|
|2022-01-14 23:59:43|  btc nice bounce...|        0|
|2022-01-14 23:59:42|  hey  wait  help...|        2|
|2022-01-14 23:59:14|4 hour top movers...|        2|
|2022-01-14 23:57:50|current  price is...|        2|
|2022-01-14 23:57:31|      fixes this    |        2|
|2022-01-14 23:57:07|                 ...|        2|
|2022-01-14 23:57:05|  a new block was...|       -2|
|2022-01-14 23:56:31|traders say  run ...|       -2|
|2022-01-14 23:56:25|   bitcoin will r...|        2|
|2022-01-14 23:56:05|bpod found  in a ...|        2|
|2022-01-14 23:55:46|       is really ...|       -2|
|2022-01-14 23:55:33|o melveny  dla pi...|        2|
|2022-01-14 23:55:31|this video will w...|    

                                                                                

In [28]:
df3 = df2.drop('text')

In [29]:
df3.show()



+-------------------+---------+
|               date|sentiment|
+-------------------+---------+
|2022-01-14 23:59:59|       -2|
|2022-01-14 23:59:56|        2|
|2022-01-14 23:59:56|        0|
|2022-01-14 23:59:43|        0|
|2022-01-14 23:59:42|        2|
|2022-01-14 23:59:14|        2|
|2022-01-14 23:57:50|        2|
|2022-01-14 23:57:31|        2|
|2022-01-14 23:57:07|        2|
|2022-01-14 23:57:05|       -2|
|2022-01-14 23:56:31|       -2|
|2022-01-14 23:56:25|        2|
|2022-01-14 23:56:05|        2|
|2022-01-14 23:55:46|       -2|
|2022-01-14 23:55:33|        2|
|2022-01-14 23:55:31|        2|
|2022-01-14 23:55:26|        2|
|2022-01-14 23:55:17|        2|
|2022-01-14 23:55:09|        2|
|2022-01-14 23:54:52|       -2|
+-------------------+---------+
only showing top 20 rows



                                                                                

In [30]:
#pandas_df = df2.toPandas()

In [None]:
# 将 DataFrame 写入 S3，使用 gzip 压缩
df3.write.option("header", "true") \
    .option("compression", "gzip") \
    .mode("overwrite") \
    .csv("s3a://dcu-dmv-bucket/no_text/")



In [31]:
# # 将 df2 写入到 S3 的指定路径
# df2.write.csv("s3a://dcu-dmv-bucket/with_text", 
#               header=True, mode="overwrite")


In [32]:
# pandas_df['sentiment'] = pandas_df['text'].apply(lambda x: sentiment_score(x[:512]))

24/11/23 17:38:24 WARN BlockManagerMasterEndpoint: No more replicas available for broadcast_21_python !
