In [73]:
sc

In [74]:
# sc master - running locally
sc.master

'local[*]'

In [75]:
# import SparkSession library 
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('data_processing').getOrCreate()

In [76]:
import warnings
warnings.filterwarnings("ignore")

In [77]:
# spark is from the previous example.
sc = spark.sparkContext

# A JSON dataset is pointed to by path.
# Move the 'people.json' file to hadoop /user1
path = "/user5/ProjectTweets.csv"
#path = "file:///home/hduser/Downloads/second semester-2project"
body = spark.read.csv(path)
body.show

<bound method DataFrame.show of DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string]>

# Data Preprocessing 

In [78]:
#Count the number of records
body.count()

                                                                                

1600000

In [79]:
# Number of columns
len(body.columns)

6

In [80]:
# Display the structure of schema
body.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



In [81]:
# Show the first 5 records
body.show(5)

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+---+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows



In [82]:
#show columns
body.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5']

In [86]:
# Display the summary of the data
body.summary().show()



+-------+------------------+--------------------+--------------------+--------+-------------------+--------------------+
|summary|               _c0|                 _c1|                 _c2|     _c3|                _c4|                 _c5|
+-------+------------------+--------------------+--------------------+--------+-------------------+--------------------+
|  count|           1600000|             1600000|             1600000| 1600000|            1600000|             1600000|
|   mean|          799999.5|1.9988175522956276E9|                null|    null|4.325887521835714E9|                null|
| stddev|461880.35968924535| 1.935760736227021E8|                null|    null|5.16273321845489E10|                null|
|    min|                 0|          1467810369|Fri Apr 17 20:30:...|NO_QUERY|       000catnap000|                 ...|
|    25%|          399999.0|       1.956901607E9|                null|    null|            32508.0|                null|
|    50%|          799999.0|    

                                                                                

In [87]:
renamed_body = body \
    .withColumnRenamed('_c0', 'user_id') \
    .withColumnRenamed('_c1', 'number') \
    .withColumnRenamed('_c2', 'time') \
    .withColumnRenamed('_c3', 'query') \
    .withColumnRenamed('_c4', 'username') \
    .withColumnRenamed('_c5', 'tweet')

renamed_body.show()

+-------+----------+--------------------+--------+---------------+--------------------+
|user_id|    number|                time|   query|       username|               tweet|
+-------+----------+--------------------+--------+---------------+--------------------+
|      0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|      1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|      2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|      3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|      4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|      5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|      6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|      7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|      8|1467811795|Mon Apr 06 2

In [88]:
#deleting column
renamed_body.drop('number').show()

+-------+--------------------+--------+---------------+--------------------+
|user_id|                time|   query|       username|               tweet|
+-------+--------------------+--------+---------------+--------------------+
|      0|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|      1|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|      2|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|      3|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|      4|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|      5|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|      6|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|      7|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|      8|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|      9|Mon Apr 06 22:20:...|NO_QUERY|        mimismo|@twittera que me ...|

## Subset

In [89]:
# Display the subset of data
renamed_body.select(['time','tweet']).show()

+--------------------+--------------------+
|                time|               tweet|
+--------------------+--------------------+
|Mon Apr 06 22:19:...|@switchfoot http:...|
|Mon Apr 06 22:19:...|is upset that he ...|
|Mon Apr 06 22:19:...|@Kenichan I dived...|
|Mon Apr 06 22:19:...|my whole body fee...|
|Mon Apr 06 22:19:...|@nationwideclass ...|
|Mon Apr 06 22:20:...|@Kwesidei not the...|
|Mon Apr 06 22:20:...|         Need a hug |
|Mon Apr 06 22:20:...|@LOLTrish hey  lo...|
|Mon Apr 06 22:20:...|@Tatiana_K nope t...|
|Mon Apr 06 22:20:...|@twittera que me ...|
|Mon Apr 06 22:20:...|spring break in p...|
|Mon Apr 06 22:20:...|I just re-pierced...|
|Mon Apr 06 22:20:...|@caregiving I cou...|
|Mon Apr 06 22:20:...|@octolinz16 It it...|
|Mon Apr 06 22:20:...|@smarrison i woul...|
|Mon Apr 06 22:20:...|@iamjazzyfizzle I...|
|Mon Apr 06 22:20:...|Hollis' death sce...|
|Mon Apr 06 22:20:...|about to file taxes |
|Mon Apr 06 22:20:...|@LettyA ahh ive a...|
|Mon Apr 06 22:20:...|@FakerPatt

In [90]:
from pyspark.sql.functions import col

# Filter the DataFrame for entries in May and select only 'time' and 'tweet' columns
renamed_body.filter(col('time').substr(5, 3) == 'May').select('time', 'tweet').show()


+--------------------+--------------------+
|                time|               tweet|
+--------------------+--------------------+
|Fri May 01 20:08:...|jayseto@KevinSpac...|
|Fri May 01 20:08:...|Bad hair day for ...|
|Fri May 01 20:08:...|It's N-O-N  P-H-I...|
|Fri May 01 20:08:...|    Twitter hate me |
|Fri May 01 20:08:...|did i mention, I ...|
|Fri May 01 20:08:...|swine flu attacks...|
|Fri May 01 20:08:...|Please God, make ...|
|Fri May 01 20:08:...|Wants to walk to ...|
|Fri May 01 20:08:...|@minnie_boo oh no...|
|Fri May 01 20:08:...|I didn't win the ...|
|Fri May 01 20:08:...|Tuesday and I are...|
|Fri May 01 20:08:...|@MsAmeliaBadila n...|
|Fri May 01 20:08:...|Michelle is sad t...|
|Fri May 01 20:08:...|@justashley  my h...|
|Fri May 01 20:09:...|study for finals ...|
|Fri May 01 20:09:...|@amyprutch hmmm.....|
|Fri May 01 20:09:...|misses having all...|
|Fri May 01 20:09:...|@IncredibleLAGO t...|
|Fri May 01 20:09:...|right about now i...|
|Fri May 01 20:09:...|Argh! Ther

In [91]:

# Filter the DataFrame for entries in April and using 'time' and 'username' columns
renamed_body.filter(col('time').substr(5, 3) == 'Apr').select('time', 'username','tweet').show()


+--------------------+---------------+--------------------+
|                time|       username|               tweet|
+--------------------+---------------+--------------------+
|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|
|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|
|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|
|Mon Apr 06 22:19:...|        ElleCTF|my whole body fee...|
|Mon Apr 06 22:19:...|         Karoli|@nationwideclass ...|
|Mon Apr 06 22:20:...|       joy_wolf|@Kwesidei not the...|
|Mon Apr 06 22:20:...|        mybirch|         Need a hug |
|Mon Apr 06 22:20:...|           coZZ|@LOLTrish hey  lo...|
|Mon Apr 06 22:20:...|2Hood4Hollywood|@Tatiana_K nope t...|
|Mon Apr 06 22:20:...|        mimismo|@twittera que me ...|
|Mon Apr 06 22:20:...| erinx3leannexo|spring break in p...|
|Mon Apr 06 22:20:...|   pardonlauren|I just re-pierced...|
|Mon Apr 06 22:20:...|           TLeC|@caregiving I cou...|
|Mon Apr 06 22:20:...|robrobbierobert|@o

In [92]:
# Filtering based on conditions for 'time' and 'username'
renamed_body.where((renamed_body['time'].like('%Apr%')) & (renamed_body['username'] == 'kateblogs')).show()


[Stage 124:>                                                        (0 + 1) / 1]

+-------+----------+--------------------+--------+---------+--------------------+
|user_id|    number|                time|   query| username|               tweet|
+-------+----------+--------------------+--------+---------+--------------------+
|   2755|1468444945|Tue Apr 07 01:42:...|NO_QUERY|kateblogs|@rhyswynne That's...|
| 831109|1557400928|Sun Apr 19 03:53:...|NO_QUERY|kateblogs|Good morning  Or ...|
| 831327|1557429675|Sun Apr 19 04:03:...|NO_QUERY|kateblogs|@liamvickery Hi  ...|
+-------+----------+--------------------+--------+---------+--------------------+



                                                                                

# Aggregations

In [93]:
renamed_body.groupBy('username').count().show()

[Stage 125:>                                                        (0 + 2) / 2]

+---------------+-----+
|       username|count|
+---------------+-----+
|     megan_rice|   15|
|        Daniiej|    3|
|         MeghTW|    1|
|   candicebunny|    1|
|stranger_danger|   14|
|  divingkid2001|    1|
|    Lilli_Allen|    1|
|        caaaami|    1|
|       J_Moneyy|    7|
|        SoEdith|    5|
|     convoy3571|   13|
|       kyrabeth|    1|
|      kateblogs|   75|
|    lovelylivxo|   16|
|       irlbinky|    3|
|        Ste1987|   50|
|       squintoo|    1|
|     PhantomV48|    2|
|        sophizz|    2|
|      tink68113|    1|
+---------------+-----+
only showing top 20 rows



                                                                                

In [94]:
from pyspark.sql import SparkSession

# 'user_id, number and query are being exclude from the analysis

for col in renamed_body.columns:
    if col not in['user_id','number','query']:
        print(f" *** Aggregation for {col} ***")
        renamed_body.groupBy(col).count().orderBy('count', ascending=False).show(truncate=False)



 *** Aggregation for time ***


                                                                                

+----------------------------+-----+
|time                        |count|
+----------------------------+-----+
|Mon Jun 15 12:53:14 PDT 2009|20   |
|Fri May 22 05:10:17 PDT 2009|17   |
|Mon Jun 15 13:39:50 PDT 2009|17   |
|Fri May 29 13:40:04 PDT 2009|17   |
|Fri Jun 05 11:05:33 PDT 2009|16   |
|Fri Jun 05 14:13:07 PDT 2009|16   |
|Mon Jun 01 12:25:21 PDT 2009|15   |
|Mon Jun 15 10:39:32 PDT 2009|15   |
|Sat Jun 06 11:59:49 PDT 2009|15   |
|Fri May 22 08:34:27 PDT 2009|15   |
|Fri Jun 05 09:45:09 PDT 2009|15   |
|Fri May 29 10:09:34 PDT 2009|15   |
|Sat May 30 07:53:15 PDT 2009|15   |
|Fri Jun 05 14:17:04 PDT 2009|15   |
|Mon Jun 01 15:19:07 PDT 2009|15   |
|Sat Jun 06 15:22:24 PDT 2009|14   |
|Fri May 29 12:41:22 PDT 2009|14   |
|Mon Jun 15 13:46:10 PDT 2009|14   |
|Fri Jun 05 14:13:06 PDT 2009|14   |
|Sat May 30 10:17:56 PDT 2009|14   |
+----------------------------+-----+
only showing top 20 rows

 *** Aggregation for username ***




+---------------+-----+
|username       |count|
+---------------+-----+
|lost_dog       |549  |
|webwoke        |345  |
|tweetpet       |310  |
|SallytheShizzle|281  |
|VioletsCRUK    |279  |
|mcraddictal    |276  |
|tsarnick       |248  |
|what_bugs_u    |246  |
|Karen230683    |238  |
|DarkPiano      |236  |
|SongoftheOss   |227  |
|Jayme1988      |225  |
|keza34         |219  |
|ramdomthoughts |216  |
|shanajaca      |213  |
|wowlew         |212  |
|nuttychris     |211  |
|TraceyHewins   |211  |
|thisgoeshere   |207  |
|Spidersamm     |205  |
+---------------+-----+
only showing top 20 rows

 *** Aggregation for tweet ***




+-------------------------------------------------------------------------------------------------------+-----+
|tweet                                                                                                  |count|
+-------------------------------------------------------------------------------------------------------+-----+
|isPlayer Has Died! Sorry                                                                               |210  |
|good morning                                                                                           |118  |
|headache                                                                                               |115  |
|Good morning                                                                                           |112  |
|Headache                                                                                               |106  |
|Not to worry, noone got that one. Next question starts in 1 minute, get your thinking caps on          

                                                                                

# Sorting

In [95]:
renamed_body.sort("user_id", ascending=False).show()



+-------+----------+--------------------+--------+---------------+--------------------+
|user_id|    number|                time|   query|       username|               tweet|
+-------+----------+--------------------+--------+---------------+--------------------+
| 999999|1879943219|Thu May 21 23:36:...|NO_QUERY|     redcomet81|@MsTeagan ...and ...|
| 999998|1879943113|Thu May 21 23:36:...|NO_QUERY|        virmani|@jigardoshi neah....|
| 999997|1879942975|Thu May 21 23:36:...|NO_QUERY|          znmeb|@Brat13 Hell, Win...|
| 999996|1879942922|Thu May 21 23:36:...|NO_QUERY|       nick1975|@vactress http://...|
| 999995|1879942807|Thu May 21 23:36:...|NO_QUERY|        divabat|@healingsinger th...|
| 999994|1879942758|Thu May 21 23:36:...|NO_QUERY|michebuckingham|@dannygokey I can...|
| 999993|1879942640|Thu May 21 23:36:...|NO_QUERY|        CB_PWNS|Almost whole week...|
| 999992|1879942629|Thu May 21 23:36:...|NO_QUERY|        AbbyRom|here in my cousin...|
| 999991|1879942610|Thu May 21 2

                                                                                

In [96]:

# Group by 'username' and find the average 'user_id' for each 'username'
renamed_body.groupBy('username').agg(F.avg('user_id').alias('avg_user_id')).orderBy('avg_user_id', ascending=False).show(50, False)


[Stage 138:>                                                        (0 + 2) / 2]

+---------------+-----------+
|username       |avg_user_id|
+---------------+-----------+
|bpbabe         |1599997.0  |
|AmandaMarie1028|1599995.0  |
|EvolveTom      |1599994.0  |
|sdancingsteph  |1599992.0  |
|cathriiin      |1599987.0  |
|LISKFEST       |1599985.0  |
|RobFoxKerr     |1599984.0  |
|xoAurixo       |1599983.0  |
|puchal_ek      |1599980.0  |
|angel_sammy04  |1599979.0  |
|LaurenMoo10    |1599978.0  |
|millerslab     |1599972.0  |
|adbillingsley  |1599970.0  |
|bobzipp        |1599969.0  |
|paligurl93     |1599967.0  |
|heartcures     |1599964.0  |
|dacyj          |1599962.0  |
|tattoodancer35 |1599956.0  |
|charitojoy     |1599954.0  |
|stephyway      |1599951.0  |
|xSandraaaaaa   |1599949.0  |
|martybaltiero  |1599942.0  |
|ajenne         |1599939.0  |
|Essence05      |1599935.0  |
|tly23452       |1599934.0  |
|tkosofsky      |1599929.0  |
|Marianne_P     |1599926.0  |
|followtweety   |1599921.0  |
|bendotorg      |1599920.0  |
|Belga777       |1599918.0  |
|spiderylu

                                                                                

# Collect

In [97]:
# Collect _set 
renamed_body.groupBy("username").agg(F.collect_set("tweet").alias("specific_tweets")).show()




+---------------+--------------------+
|       username|     specific_tweets|
+---------------+--------------------+
|000matthewkelly|[Just had a faceb...|
|      000yea000|[thank God we fin...|
|      0010x0010|[@JoannaGoff  Rea...|
|    001BabyGirl|[@ianvisagie Why?...|
|    007LouiseOB|[@juliana6 I sit ...|
|      007buddha|[OMG one hour lim...|
|       007peter|[@NaniWaialeale â...|
|      007wisdom|[&quot;All that w...|
|    00Jessica81|[ I think I pulle...|
|        00KEVEN|[@Siope haven't h...|
|    00KufiKyd00|[@MontyAyE OmG bo...|
|       00Sleepy|[@bindermichi the...|
|          00Syd|[7 days until I c...|
|    00YungSwagg|[I HATE MOVING AN...|
|    00blondey00|[good luck fuzzy ...|
|          00cat|[@DeclanCopyright...|
|      00ceilidh|[a mosquito just ...|
|          00deb|[@ItsJustDave Wha...|
|       00kate00|[its been pouring...|
|      00polloto|[wasting time on ...|
+---------------+--------------------+
only showing top 20 rows



                                                                                

In [98]:
### UDFs: User defined functions

In [103]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a function to categorize tweets based on their length
def tweet_category(tweet):
    if len(tweet) <= 50:
        return 'Short'
    elif len(tweet) <= 100:
        return 'Medium'
    else:
        return 'Long'

# Convert the function to a User Defined Function (UDF)
tweet_category_udf = udf(tweet_category, StringType())

# Apply the UDF to create a new column 'tweet_category'
renamed_body = renamed_body.withColumn('tweet_category', tweet_category_udf(renamed_body['tweet']))

# Display the original 'tweet' and the newly created 'tweet_category'
renamed_body.select('tweet', 'tweet_category').show()


+--------------------+--------------+
|               tweet|tweet_category|
+--------------------+--------------+
|@switchfoot http:...|          Long|
|is upset that he ...|          Long|
|@Kenichan I dived...|        Medium|
|my whole body fee...|         Short|
|@nationwideclass ...|          Long|
|@Kwesidei not the...|         Short|
|         Need a hug |         Short|
|@LOLTrish hey  lo...|        Medium|
|@Tatiana_K nope t...|         Short|
|@twittera que me ...|         Short|
|spring break in p...|         Short|
|I just re-pierced...|         Short|
|@caregiving I cou...|        Medium|
|@octolinz16 It it...|        Medium|
|@smarrison i woul...|          Long|
|@iamjazzyfizzle I...|          Long|
|Hollis' death sce...|        Medium|
|about to file taxes |         Short|
|@LettyA ahh ive a...|        Medium|
|@FakerPattyPattz ...|        Medium|
+--------------------+--------------+
only showing top 20 rows



                                                                                

In [105]:
renamed_body.groupby("tweet_category").count().show()



+--------------+------+
|tweet_category| count|
+--------------+------+
|        Medium|658103|
|          Long|431083|
|         Short|510814|
+--------------+------+



                                                                                

# Joinging 

In [113]:
renamed_body.groupby("username").count().show(50, False)



+---------------+-----+
|username       |count|
+---------------+-----+
|megan_rice     |15   |
|Daniiej        |3    |
|MeghTW         |1    |
|candicebunny   |1    |
|stranger_danger|14   |
|divingkid2001  |1    |
|Lilli_Allen    |1    |
|caaaami        |1    |
|J_Moneyy       |7    |
|SoEdith        |5    |
|convoy3571     |13   |
|kyrabeth       |1    |
|kateblogs      |75   |
|lovelylivxo    |16   |
|irlbinky       |3    |
|Ste1987        |50   |
|squintoo       |1    |
|PhantomV48     |2    |
|sophizz        |2    |
|tink68113      |1    |
|melliejellie   |2    |
|Merlene        |14   |
|marybethbeech  |12   |
|Svalentyna     |1    |
|jimdangereux   |1    |
|b_a88          |3    |
|oobinsnaffa    |2    |
|quickbrownfoxnc|1    |
|missmeganbunny |1    |
|bakerbelle     |1    |
|backinstereo   |2    |
|_CastrO_       |1    |
|Emogirltalk    |3    |
|lauraxz        |1    |
|StinaStar      |2    |
|bayuhlee       |5    |
|Keleigh5454    |1    |
|DIYSara        |26   |
|xPhaedra77x    

                                                                                

In [116]:
from pyspark.sql.types import StructType, StructField, StringType

username_data = spark.createDataFrame([
    ('megan_rice', 'MR'),
    ('erikarhanetan ', 'EK'),
    ('DIYSara', 'DS'),
    ('Merlene', 'DL'),
    ('marybethbeech', 'MB'),
    ('kateblogs', 'KB'),
    ('lovelylivxo', 'LX')
], schema=StructType().add("username", "string").add("Abbreviation", "string"))


In [117]:
username_data.show()

[Stage 161:>                                                        (0 + 1) / 1]

+--------------+------------+
|      username|Abbreviation|
+--------------+------------+
|    megan_rice|          MR|
|erikarhanetan |          EK|
|       DIYSara|          DS|
|       Merlene|          DL|
| marybethbeech|          MB|
|     kateblogs|          KB|
|   lovelylivxo|          LX|
+--------------+------------+



                                                                                

In [119]:

new_df=renamed_body.join(username_data,on='username')

In [120]:
new_df.groupby("Abbreviation").count().show(50,False)

[Stage 167:>                                                        (0 + 2) / 2]

+------------+-----+
|Abbreviation|count|
+------------+-----+
|LX          |16   |
|DL          |14   |
|MR          |15   |
|MB          |12   |
|KB          |75   |
|DS          |26   |
+------------+-----+



                                                                                

# Working with the dataframe using Pyspark

In [122]:

# Creates a temporary view using the DataFrame
body.createOrReplaceTempView("tweets")

# Display the structure of the DataFrame
body.show()


+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

In [123]:
# Define a list of strings representing JSON objects
jsonStrings = [
    '{"user_id":0,"time":"Mon Apr 06 22:19:45 PDT 2009","query":"NO_QUERY","username":"_TheSpecialOne_","tweet":"@switchfoot http://twitpic.com/2y1zl - Awww, that\'s a bummer.  You shoulda got David Carr of Third Day to do it. ;D"}',
    '{"user_id":1,"time":"Mon Apr 06 22:19:49 PDT 2009","query":"NO_QUERY","username":"scotthamilton","tweet":"is upset that he can\'t update his Facebook by texting it... and might cry as a result  School today also. Blah!"}',
    '{"user_id":2,"time":"Mon Apr 06 22:19:53 PDT 2009","query":"NO_QUERY","username":"mattycus","tweet":"@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds"}',
    # Add more JSON strings as needed
]

# Create an RDD[String] from the list of JSON strings
jsonStringsRDD = sc.parallelize(jsonStrings)

# Create a DataFrame for the JSON dataset
jsonDF = spark.read.json(jsonStringsRDD)

# Show the contents of the new DataFrame
jsonDF.show()


+--------+--------------------+--------------------+-------+---------------+
|   query|                time|               tweet|user_id|       username|
+--------+--------------------+--------------------+-------+---------------+
|NO_QUERY|Mon Apr 06 22:19:...|@switchfoot http:...|      0|_TheSpecialOne_|
|NO_QUERY|Mon Apr 06 22:19:...|is upset that he ...|      1|  scotthamilton|
|NO_QUERY|Mon Apr 06 22:19:...|@Kenichan I dived...|      2|       mattycus|
+--------+--------------------+--------------------+-------+---------------+



In [126]:
# Define the schema for the DataFrame
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("number_id", StringType(), True),
    StructField("time", StringType(), True),
    StructField("query", StringType(), True),
    StructField("username", StringType(), True),
    StructField("tweet", StringType(), True)
])


In [128]:
# Create a new DataFrame with null values
body = spark.createDataFrame([
    ("0", None, "Mon Apr 06 22:19:45 PDT 2009", "NO_QUERY", "_TheSpecialOne_", "@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer. You shoulda got David Carr of Third Day to do it. ;D"),
    ("1", "1467810672", "Mon Apr 06 22:19:49 PDT 2009", "NO_QUERY", "scotthamilton", "is upset that he can't update his Facebook by texting it... and might cry as a result School today also. Blah!"),
    ("2", "1467810917", "Mon Apr 06 22:19:53 PDT 2009", "NO_QUERY", "mattycus", "@Kenichan I dived many times for the ball. Managed to save 50% The rest go out of bounds")
], schema=schema)
# Show the DataFrame with null values
body.show()

+-------+----------+--------------------+--------+---------------+--------------------+
|user_id| number_id|                time|   query|       username|               tweet|
+-------+----------+--------------------+--------+---------------+--------------------+
|      0|      null|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|      1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|      2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
+-------+----------+--------------------+--------+---------------+--------------------+



In [129]:
# Fill all null values with 0
body.fillna('0').show()

+-------+----------+--------------------+--------+---------------+--------------------+
|user_id| number_id|                time|   query|       username|               tweet|
+-------+----------+--------------------+--------+---------------+--------------------+
|      0|         0|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|      1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|      2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
+-------+----------+--------------------+--------+---------------+--------------------+



In [130]:
# Fill null values with specific values for each column
body_filled = body.fillna({
    'number_id': '0',  # Replace null values in _c1 with '0'
    'time': 'Unknown',  # Replace null values in _c2 with 'Unknown'
    'query': 'NO_QUERY',
    'username': 'none',
    'tweet': 'No tweet'
})


In [131]:
body_filled.show(10)

+-------+----------+--------------------+--------+---------------+--------------------+
|user_id| number_id|                time|   query|       username|               tweet|
+-------+----------+--------------------+--------+---------------+--------------------+
|      0|         0|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|      1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|      2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
+-------+----------+--------------------+--------+---------------+--------------------+



In [132]:
# Return new df omitting rows with null values
body.na.drop().show()

+-------+----------+--------------------+--------+-------------+--------------------+
|user_id| number_id|                time|   query|     username|               tweet|
+-------+----------+--------------------+--------+-------------+--------------------+
|      1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|scotthamilton|is upset that he ...|
|      2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|     mattycus|@Kenichan I dived...|
+-------+----------+--------------------+--------+-------------+--------------------+



In [133]:
body.na.drop(subset='number_id').show()

+-------+----------+--------------------+--------+-------------+--------------------+
|user_id| number_id|                time|   query|     username|               tweet|
+-------+----------+--------------------+--------+-------------+--------------------+
|      1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|scotthamilton|is upset that he ...|
|      2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|     mattycus|@Kenichan I dived...|
+-------+----------+--------------------+--------+-------------+--------------------+



In [134]:
#deleting column 
body.drop('query').show()

+-------+----------+--------------------+---------------+--------------------+
|user_id| number_id|                time|       username|               tweet|
+-------+----------+--------------------+---------------+--------------------+
|      0|      null|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|
|      1|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|
|      2|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|
+-------+----------+--------------------+---------------+--------------------+

