# Module 9. Scalable Data Processing

## Mandatory task 5. Analysis of tweets.

### Pablo Yañez Martin

# RDD

In [1]:
from pyspark import SparkConf, SparkContext

In [2]:
ruta = "D:/Dropbox/Pablo/Master/Modulo 9. Scalable Data Processing/Java/Data/tweets.tsv"

In [3]:
spark_conf = SparkConf().setAppName("Tweets_RDD").setMaster("local[2]")
spark_context = SparkContext(conf=spark_conf)

In [4]:
logger = spark_context._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)

In [5]:
lectura = spark_context.textFile(ruta)

In [6]:
columnas=lectura.map(lambda line: line.split('\t'))

#### 1. The 10 most repeated words in all the tweets.

In [7]:
lista=('RT','http://t.co/t3KHlNvtIz','#Big12MBB :','will','and','#Big12MBB','#Big12MBB:','#Big12','set', 'Big12Conference:','the','The','of','a','in','to','at','with','-','for','on')

In [8]:
resultado1=columnas\
    .map(lambda array : array[2])\
    .flatMap(lambda tuit : tuit.split(' '))\
    .filter(lambda word : word not in lista)\
    .map(lambda word : (word,1))\
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda par : par[1],ascending=False)\
    .take(10)

In [9]:
print('The most repeated words are: ')
for (word,frequency) in resultado1:
    print(str(word) +' : '+str(frequency))

The most repeated words are: 
Big12Conference : 191
Championship : 134
quarterfinal : 127
Thursday's : 125
BaylorMBB : 124
bracket : 117
updated : 117
matchups : 117
TCUBasketball : 109
tomorrow : 97


#### 2.  The user who has written mosts tweets.

In [10]:
resultado2=columnas\
    .map(lambda array : (array[1],1))\
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda par : par[1],ascending=False)\
    .take(1)

In [11]:
print('The user who has written more tweets is:')
for (user,freq) in resultado2:
    print(user+' : '+str(freq))

The user who has written more tweets is:
@Big12Conference : 143


#### 3. The user who has written the shortest tweet and it length.

In [12]:
resultado3=columnas\
    .map(lambda array : (array[1],array[2],array[3]))\
    .map(lambda array : (array[0],len(array[1]),array[2]))\
    .sortBy(lambda par : par[1],ascending=True)\
    .take(1)

In [13]:
print('The shortest tweet has written by:')
for (user,len_tuit,time) in resultado3:
    print('User: '+user +' - Tweet lenght: '+str(len_tuit)+' - Time and date of the tweet: '+time)

The shortest tweet has written by:
User: @kSECKA33 - Tweet lenght: 21 - Time and date of the tweet: Thu Mar 13 06:16:18 CET 2014


In [14]:
spark_context.stop()

# DataFrame

In [15]:
from pyspark.sql import SparkSession, functions
from pyspark.sql.functions import explode, col

In [16]:
spark_session = SparkSession \
        .builder \
        .master("local[4]") \
        .getOrCreate()

In [17]:
logger = spark_session._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)

In [18]:
data_frame = spark_session \
        .read \
        .options(header='false', inferschema='true') \
        .option("delimiter", "\t") \
        .csv("D:/Dropbox/Pablo/Master/Modulo 9. Scalable Data Processing/Java/Data/tweets.tsv") \
        .persist()

In [19]:
data_frame.printSchema()

root
 |-- _c0: long (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [20]:
data_frame.show()

+------------------+----------------+--------------------+--------------------+---+---+--------------------+
|               _c0|             _c1|                 _c2|                 _c3|_c4|_c5|                 _c6|
+------------------+----------------+--------------------+--------------------+---+---+--------------------+
|444004634718261248|       @chief800|RT Big12Conferenc...|Thu Mar 13 07:58:...|   |   |Kansas City Metro...|
|443996287617081345|     @VRamirezTX|RT Big12Conferenc...|Thu Mar 13 07:25:...|   |   |               Austi|
|443993517896261633|@Big12Conference|#Big12Insider Wen...|Thu Mar 13 07:14:...|   |   |                null|
|443990634773229569|      @OhYouGirl|RT Big12Conferenc...|Thu Mar 13 07:03:...|   |   |    OOOOOOOOOklahoma|
|443982530169942016|         @ROK_NB|Great first day o...|Thu Mar 13 06:30:...|   |   |                 DFW|
|443980991082688512|       @barrin_l|Iowa State begins...|Thu Mar 13 06:24:...|   |   |                null|
|443976199887282176

#### 1. The 10 most repeated words in all the tweets.

In [21]:
lista=['RT','http://t.co/t3KHlNvtIz', 'Big12Conference:','I','#Big12','#Big12MBB:','#Big12MBB',' ','set','the','and','will','is','has','The','of','a','in','to','at','with','-','for','on']

In [22]:
data_frame.schema.names

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6']

In [23]:
data_frame_2 = data_frame \
        .select(explode(functions.split("_c2",' ')))\
        .withColumnRenamed("col","words")

In [24]:
data_frame_2\
        .filter(~col("words").isin(lista))\
        .groupBy("words")\
        .count()\
        .sort("count",ascending=False)\
        .show(10)

+---------------+-----+
|          words|count|
+---------------+-----+
|Big12Conference|  191|
|   Championship|  134|
|   quarterfinal|  127|
|     Thursday's|  125|
|      BaylorMBB|  124|
|        bracket|  117|
|        updated|  117|
|       matchups|  117|
|  TCUBasketball|  109|
|       tomorrow|   97|
+---------------+-----+
only showing top 10 rows



#### 2.  The user who has written mosts tweets.

In [25]:
data_frame\
        .select(col("_c1"))\
        .groupBy("_c1")\
        .count()\
        .sort("count",ascending=False)\
        .withColumnRenamed("_c1","User")\
        .show(1)

+----------------+-----+
|            User|count|
+----------------+-----+
|@Big12Conference|  143|
+----------------+-----+
only showing top 1 row



#### 3. The user who has written the shortest tweet and it length.

In [26]:
data_frame_3=data_frame\
        .select("_c1","_c2","_c3")\
        .withColumnRenamed("_c1","User")\
        .withColumnRenamed("_c2","Tweet length")\
        .withColumnRenamed("_c3","Date and time")

In [27]:
data_frame_3\
        .select(col("User"),functions.length(col("Tweet length")),col('Date and time'))\
        .withColumnRenamed("length(Tweet length)","Tweet length")\
        .sort("Tweet length",ascending=True)\
        .show(1)

+---------+------------+--------------------+
|     User|Tweet length|       Date and time|
+---------+------------+--------------------+
|@kSECKA33|          21|Thu Mar 13 06:16:...|
+---------+------------+--------------------+
only showing top 1 row

