In [11]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, array

In [2]:
def create_spark_session():
    spark = SparkSession \
      .builder \
      .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
      .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
      .getOrCreate()
  
    sc=spark.sparkContext
    return spark

In [3]:
spark = create_spark_session()

In [4]:
spark

In [5]:
sc=spark.sparkContext
hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.s3a.endpoint", '127.0.0.1:9000')
hadoop_conf.set("fs.s3a.access.key", 'minioadmin')
hadoop_conf.set("fs.s3a.secret.key",'minioadmin')
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false") 
hadoop_conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

# 1- Read JSON files from Minio.
### lets spark infer the scema

In [6]:
df = spark.read.json("s3a://tweets/*.json")

In [8]:
df.printSchema()

root
 |-- kafka_consume_ts: string (nullable = true)
 |-- message: struct (nullable = true)
 |    |-- contributors: string (nullable = true)
 |    |-- coordinates: string (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- entities: struct (nullable = true)
 |    |    |-- hashtags: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |-- media: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |    |-- ele

In [9]:
df.show(2)

+----------------+--------------------+
|kafka_consume_ts|             message|
+----------------+--------------------+
|      2020-01-26|{null, null, Sat ...|
|      2020-01-26|{null, null, Thu ...|
+----------------+--------------------+
only showing top 2 rows



###  I dont need kafka_consume_ts field



In [12]:
df3 = df.select(col('message.*'))

# 2,3
### Extract retweet if exist and drop user field and select the fields that we need for answering questions

In [13]:
df_retweet = df.select(col('message.*')) \
.select(col('retweeted_status')) \
.na.drop() \
.select(col('retweeted_status.*')) \
.drop('user') \
.select("id","text","created_at")

# 2,3
### Extract quoted if exist and drop user field and select the fields that we need for answering questions

In [14]:
df_quoted = df.select(col('message.*')) \
.select(col('quoted_status')) \
.na.drop() \
.select(col('quoted_status.*')) \
.drop(col('user')) \
.select("id","text","created_at")

# 2,3
### delete user quoted_status and retweeted_status from original dataframe and create new dataframe and select the fields that we need for answering question

In [17]:
new_df = df.select(col('message.*')) \
.drop('user','quoted_status','retweeted_status') \
.select("id","text","created_at")

# 3
### merg 3 dataframe

In [33]:
merged_df =  new_df.union(df_quoted).union(df_retweet)

In [34]:
merged_df.show(3)

+-------------------+--------------------+--------------------+
|                 id|                text|          created_at|
+-------------------+--------------------+--------------------+
|1220900167106297857|براي ورود به وب س...|Sat Jan 25 02:44:...|
|1220257228411867136|@MrzHamed من بانك...|Thu Jan 23 08:09:...|
|1220899563248091139|سایت های ایرانی ک...|Sat Jan 25 02:41:...|
+-------------------+--------------------+--------------------+
only showing top 3 rows



# 4 
### Remove duplicate tweets.

In [35]:
df_with_unique_tweet = merged_df.dropDuplicates(['id'])

In [36]:
df_with_unique_tweet.count()

316

# 5,6
### Remove space characters from text fields.

##### because i have already done this operation on user data with SPARK SQL ,i will reuse it for tweet data.

### also will do it for Converting created_at field to DateTime with (year-month-day) format.

In [37]:
df_with_unique_tweet.createOrReplaceTempView('tweetdata')

In [39]:
semi_final_df = spark.sql('''
select 
id,
replace(text,' ','') AS text,
date_format(to_timestamp(substr(created_at,5),'MMM dd HH:mm:ss Z yyyy'),'yyyy-MM-dd') AS created_at

from tweetdata

''')

# 7
### Partition dataframe based on created_at date.


In [54]:
final_df = semi_final_df.repartition(10,col('created_at'))

In [55]:
final_df.rdd.getNumPartitions()

10

# 8
###  Load each partition in separate folders in MINIO. The name of folders should be set according to the partition name.

##### i'm not sure it is correct

In [56]:
final_df.write.csv('s3a://partweet/tweets',header=True)