In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from google.cloud import storage

# Environment setup

In [2]:
#config the connector jar file
spark = (SparkSession.builder.appName("SimpleSparkJob").master("local")
            .config("spark.jars", "/opt/spark/jars/gcs-connector-latest-hadoop2.jar")
            .config("spark.executor.memory", "2G")  #excutor excute only 2G
            .config("spark.driver.memory","4G") 
            .config("spark.executor.cores","3") #Cluster use only 3 cores to excute
            .config("spark.python.worker.memory","1G") # each worker use 1G to excute
            .config("spark.driver.maxResultSize","3G") #Maximum size of result is 3G
            .config("spark.kryoserializer.buffer.max","1024M")
            .getOrCreate())

#config the credential to identify the google cloud hadoop file 
spark.conf.set("google.cloud.auth.service.account.json.keyfile","/opt/spark/lucky-wall-393304-2a6a3df38253.json")
spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')


your 131072x1 screen size is bogus. expect trouble
23/12/08 22:52:03 WARN Utils: Your hostname, HXV-X1 resolves to a loopback address: 127.0.1.1; using 172.30.29.145 instead (on interface eth0)
23/12/08 22:52:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/12/08 22:52:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/08 22:52:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Data preprocessing

In [30]:
df = spark.read.option("multiline", True).json("../data")
df.printSchema()

root
 |-- _type: string (nullable = true)
 |-- cashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- conversationId: long (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- date: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
 |-- id_str: string (nullable = true)
 |-- inReplyToTweetId: long (nullable = true)
 |-- inReplyToUser: struct (nullable = true)
 |    |-- _type: string (nullable = true)
 |    |-- displayname: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- username: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- likeCount: long (nullable = true)
 |-- links: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tcourl: string (nullable = true)
 |    |    |-- text: string (n

In [115]:
df_features = spark.sql("""
select distinct
    user.id,
    -- 4 among top 10 boolean user features having highest IG with human/bot label
    size(user.descriptionLinks) > 0 as hasURL,
    user.location <> "" as geoEnabled,
    user.verified,
    user.profileBannerUrl is not null as profileBackgroungImageURL,
    -- top 10 numerical user features having highest IG with human/bot label
    user.followersCount / user.friendsCount as followerFriendRatio,
    user.listedCount,
    len(user.rawDescription) as descriptionLength,
    user.followersCount,
    len(user.username) as nameLength,
    to_unix_timestamp(to_date(user.created)) as createdAt,
    user.friendsCount,
    user.statusesCount,
    user.favouritesCount,
    len(user.displayname) as screenNameLength,
    -- tweet
    rawContent
from data
""")

In [116]:
df_features.show()

[Stage 74:>                                                         (0 + 1) / 1]

+-------------------+------+----------+--------+-------------------------+-------------------+-----------+-----------------+--------------+----------+----------+------------+-------------+---------------+----------------+--------------------------------+
|                 id|hasURL|geoEnabled|verified|profileBackgroungImageURL|followerFriendRatio|listedCount|descriptionLength|followersCount|nameLength| createdAt|friendsCount|statusesCount|favouritesCount|screenNameLength|                      rawContent|
+-------------------+------+----------+--------+-------------------------+-------------------+-----------+-----------------+--------------+----------+----------+------------+-------------+---------------+----------------+--------------------------------+
| 874297334615638016|  true|      true|   false|                     true| 1.8705357142857142|          1|                4|           838|         7|1497200400|         448|         2210|          25733|               8|            @h

                                                                                

In [117]:
df_features.write.parquet("../data/preprocessed_data.parquet")

                                                                                