In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, when, mean
from pyspark.sql.functions import regexp_replace

In [3]:
spark = (SparkSession.builder.appName("group08").master("spark://<SPARK_IP>:7077")
         .config("spark.jars", "/opt/spark/jars/gcs-connector-latest-hadoop2.jar")
         .config("spark.executor.memory", "1G")  #excutor excute only 2G
        .config("spark.driver.memory","4G") 
        .config("spark.debug.maxToStringFields", "1000000") 
        .config("spark.executor.cores","1") #Cluster use only 3 cores to excute as it has 3 server
        .config("spark.python.worker.memory","1G") # each worker use 1G to excute
        .config("spark.driver.maxResultSize","3G") #Maximum size of result is 3G
        .config("spark.kryoserializer.buffer.max","1024M")
         .config("spark.port.maxRetries", "100")
         .getOrCreate())
#config the credential to identify the google cloud hadoop file 
spark.conf.set("google.cloud.auth.service.account.json.keyfile","path_to_CREDENTIAL_FILE")
spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')

23/12/21 18:32:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/21 18:32:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/12/21 18:32:34 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [4]:
def data_cleansing(i):
    kols_path=f"path_to_kols_table.csv"
    tweets_path=f"path_to_tweets_table.csv"
    
    kols_df = spark.read.csv(kols_path, header=True, inferSchema=True)
    tweets_df = spark.read.csv(tweets_path, header=True, inferSchema=True, quote='"',escape='"',multiLine=True)

    
    non_empty_columns = [c for c in kols_df.columns if kols_df.filter(kols_df[c].isNotNull()).count() > 0]
    kols_df = kols_df.select(*non_empty_columns)
    selected_columns = kols_df.columns[6:13]  
    for column in selected_columns:
        mean_value = kols_df.select(mean(col(column))).collect()[0][0]
        kols_df = kols_df.withColumn(column, when(col(column).isNull(), mean_value).otherwise(col(column)))
        kols_df = kols_df.withColumn(column, col(column).cast(IntegerType()))
        kols_df = kols_df.withColumn("is_verified_num", when(col("is_verified"), 1).otherwise(0))
    combined_df = kols_df.join(tweets_df, kols_df.user_id == tweets_df.author_id)
    combined_df = combined_df.dropDuplicates()
    df = combined_df.select("user_id", "tweet_body")
    final_df = df.withColumn("tweet_body", regexp_replace("tweet_body", "[^A-Za-z0-9\s]+", ""))
    
    return final_df

In [6]:
saved_path = f"path_to_save_data"
data_id = [1]
for i in data_id:
    df = data_cleansing(i)
    # df.write.mode("append").option("header", "true").csv(saved_path)
    df.show(5)

                                                                                

+-------------------+--------------------+
|            user_id|          tweet_body|
+-------------------+--------------------+
|1448333842427772933| TekioNFT WL Give...|
|1598261796997156864|15\n\nI made a po...|
|         1053471548|25 Giveaway  5 Ho...|
|         1053471548|50 Giveaway  24 H...|
|1596825240235032576|Weve created a ke...|
+-------------------+--------------------+
only showing top 5 rows



In [7]:
spark.stop()