# C√†i Spark, load d·ªØ li·ªáu Ukraine tweets (~40GB), clean r·ªìi l∆∞u Parquet


In [None]:
import subprocess, urllib.request, os
from pathlib import Path

# winutils c·∫ßn thi·∫øt ƒë·ªÉ Spark ch·∫°y tr√™n Windows
hadoop_bin = Path(r"C:\hadoop\bin")
hadoop_bin.mkdir(parents=True, exist_ok=True)

for name, url in [
    ("winutils.exe", "https://github.com/cdarlint/winutils/raw/master/hadoop-3.3.6/bin/winutils.exe"),
    ("hadoop.dll",   "https://github.com/cdarlint/winutils/raw/master/hadoop-3.3.6/bin/hadoop.dll"),
]:
    f = hadoop_bin / name
    if not f.exists():
        print(f"downloading {name}...")
        urllib.request.urlretrieve(url, f)

subprocess.run(["setx", "HADOOP_HOME", r"C:\hadoop"], capture_output=True)
print("done")


setx HADOOP_HOME: SUCCESS: Specified value was saved.
HADOOP_HOME (system) = ''

‚ö† Ch·∫°y PowerShell as Administrator r·ªìi ch·∫°y l·ªánh:
  [System.Environment]::SetEnvironmentVariable("HADOOP_HOME","C:\hadoop","Machine")
  Sau ƒë√≥ restart Jupyter.


In [None]:
import os
os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["PATH"] = r"C:\hadoop\bin;" + os.environ.get("PATH", "")

from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

DATA_DIR = Path(r"F:\UK-Russia\Data")
OUT_DIR  = Path(r"F:\UK-Russia\jupyter\output")
OUT_DIR.mkdir(parents=True, exist_ok=True)
PARQUET_OUT = OUT_DIR / "tweets_clean.parquet"

spark = (
    SparkSession.builder
    .appName("ukraine_etl")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "8g")
    .config("spark.sql.shuffle.partitions", "40")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.parquet.compression.codec", "snappy")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print(spark.version)


4.1.1


## ETL


In [None]:
# ki·ªÉm tra nhanh c·ªôt c·ªßa file csv
sample = spark.read.option("header","true").option("multiLine","true").option("escape",'"').csv(
    str(sorted(DATA_DIR.glob("*_UkraineCombinedTweetsDeduped.csv"))[0])
).limit(3)

print(sample.columns)
sample.select("tweetid","text","language").show(3, truncate=60)


Columns: ['_c0', 'userid', 'username', 'acctdesc', 'location', 'following', 'followers', 'totaltweets', 'usercreatedts', 'tweetid', 'tweetcreatedts', 'retweetcount', 'text', 'hashtags', 'language', 'coordinates', 'favorite_count', 'is_retweet', 'original_tweet_id', 'original_tweet_userid', 'original_tweet_username', 'in_reply_to_status_id', 'in_reply_to_user_id', 'in_reply_to_screen_name', 'is_quote_status', 'quoted_status_id', 'quoted_status_userid', 'quoted_status_username', 'extractedts']

Total columns: 29
+-------------------+------------------------------------------------------------+--------+------------------------------------------------------------+
|            tweetid|                                                        text|language|                                                    hashtags|
+-------------------+------------------------------------------------------------+--------+------------------------------------------------------------+
|1560416252937617411|Dear

In [None]:
import threading, time, requests
from pyspark.sql.types import LongType, BooleanType

csv_files = [str(f) for f in sorted(DATA_DIR.glob("*_UkraineCombinedTweetsDeduped.csv"))]
print(f"{len(csv_files)} files")

df = (
    spark.read
    .option("header", "true")
    .option("multiLine", "true")
    .option("escape", '"')
    .option("mode", "PERMISSIVE")
    .csv(csv_files)
    .select(
        F.col("tweetid").cast(LongType()),
        F.col("text"),
        F.col("language"),
        F.col("username"),
        F.col("retweetcount").cast(LongType()),
        F.col("is_retweet").cast(BooleanType()),
        F.col("tweetcreatedts"),
        F.col("hashtags"),
        F.col("followers").cast(LongType()),
    )
    .filter(F.col("tweetid").isNotNull())
    .filter(F.col("language") == "en")
    .withColumn("text", F.trim(F.regexp_replace("text", r"(\s*#\w+)+\s*$", "")))
    .filter(F.length("text") > 0)
    .dropDuplicates(["tweetid"])
)

# progress monitor nh·ªè
done = {"v": False}
def _monitor():
    app = spark.sparkContext.applicationId
    prev = -1
    while not done["v"]:
        try:
            stages = [s for s in requests.get(
                f"http://localhost:4040/api/v1/applications/{app}/stages", timeout=2
            ).json() if s["status"] == "ACTIVE"]
            if stages:
                s = stages[0]
                n, t = s["numCompleteTasks"], s["numTasks"]
                if n != prev:
                    pct = n/t*100 if t else 0
                    print(f"\r  stage {s['stageId']}  {'‚ñà'*int(pct//5)+'‚ñë'*(20-int(pct//5))}  {n}/{t}", end="", flush=True)
                    prev = n
        except: pass
        time.sleep(2)

threading.Thread(target=_monitor, daemon=True).start()
df.write.mode("overwrite").parquet(str(PARQUET_OUT))
done["v"] = True
print(f"\ndone ‚Üí {PARQUET_OUT}")


Processing 291 / 291 files
Stage 15 |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñë| 38/40 (95%)
Done ‚Üí F:\UK-Russia\jupyter\output\tweets_clean.parquet


In [None]:
result = spark.read.parquet(str(PARQUET_OUT))
print(f"{result.count():,} rows")
result.show(5, truncate=80)


Total clean rows: 11,099,751
+-------------------+--------------------------------------------------------------------------------+--------+--------------+------------+----------+-------------------+--------------------------------------------------------------------------------+---------+
|            tweetid|                                                                            text|language|      username|retweetcount|is_retweet|     tweetcreatedts|                                                                        hashtags|followers|
+-------------------+--------------------------------------------------------------------------------+--------+--------------+------------+----------+-------------------+--------------------------------------------------------------------------------+---------+
|1560416650746142721|@Kenyans No one wants to be associated with dictators in #Russia. That‚Äôs a cr...|      en| logic19827474|           0|     false|2022-08-19 00:01:35|[{'text': 'Ru