In [26]:
import findspark
findspark.init()

In [27]:
import pyspark
import os
import subprocess
import time
import datetime
import operator

from subprocess import Popen, PIPE
from pyspark.sql import SparkSession
from collections import defaultdict
from pyspark.sql.functions import col, asc,desc
import pyspark.sql.functions as F
from pyspark.sql.functions import col, max as max_, min as min_, first, when

In [28]:
spark = SparkSession.builder.appName("Activity2").getOrCreate()

## Increased Followers

In [35]:
hdfs_prefix = 'hdfs://localhost:54310'
# hdfs_prefix = 'hdfs://localhost:9000'
prefix = f'{hdfs_prefix}/datasets/covid/'
filename_prefix = prefix+'part-'
filename_suffix = '-5f4af8d5-3171-48e9-9a56-c5a7c7a84cc3-c000.json'

filenames = []
for i in range(2):
    filenames.append(filename_prefix+f"{i:05d}"+filename_suffix)
filenames

['hdfs://localhost:54310/datasets/covid/part-00000-5f4af8d5-3171-48e9-9a56-c5a7c7a84cc3-c000.json',
 'hdfs://localhost:54310/datasets/covid/part-00001-5f4af8d5-3171-48e9-9a56-c5a7c7a84cc3-c000.json']

In [22]:
# Read all into a dataframe
df = spark.read.json(filenames)

In [23]:
# Convert datetime string to datetime type
df = df.withColumn("created_at", col("created_at").cast("timestamp"))

In [24]:
df.columns

['account_created_at',
 'account_lang',
 'country_code',
 'created_at',
 'favourites_count',
 'followers_count',
 'friends_count',
 'is_quote',
 'is_retweet',
 'lang',
 'place_full_name',
 'place_type',
 'reply_to_status_id',
 'retweet_count',
 'screen_name',
 'source',
 'status_id',
 'text',
 'user_id',
 'verified']

In [37]:
# Process every tweet (using regular for loop)
verified_followers_min = defaultdict(lambda : (datetime.datetime.max,0))
verified_followers_max = defaultdict(lambda : (datetime.datetime.min,0))
active_counter = defaultdict(int)
verifieds = set()

for tweet in df.collect():
    name = tweet.screen_name
    verified = (tweet.verified == 'TRUE')
    #print(tweet.screen_name)
    
    if (verified):
        followers = int(tweet.followers_count)
        tweet_date = datetime.datetime.strptime(tweet.created_at, '%Y-%m-%dT%H:%M:%SZ')
        if (tweet_date < verified_followers_min[name][0]):
            verified_followers_min[name] = (tweet_date,followers)
        if (tweet_date > verified_followers_max[name][0]):
            verified_followers_max[name] = (tweet_date,followers)
        verifieds.add(name)
        #print(tweet.screen_name)
    active_counter[name] += 1

In [38]:
# Find how much each verified gained
verified_delta = {}
for key,(_, count_final) in verified_followers_max.items():
    count_initial = verified_followers_min[key][1]
    verified_delta[key] = count_final-count_initial
# Sort it
verified_delta = list(sorted(verified_delta.items(), key=operator.itemgetter(1),reverse=True))
verified_delta = dict(verified_delta[:1000])
verified_delta

{'CECCgov': 7000,
 'NewsweekEspanol': 0,
 'tvnnoticias': 0,
 'TUDNUSA': 0,
 'ANCALERTS': 0,
 'SSalud_mx': 0,
 'AlBayanNews': 0}

In [39]:
# Find the most active users
active_counter = list(sorted(active_counter.items(), key=operator.itemgetter(1),reverse=True))
active_counter = dict(active_counter[:1000])
active_counter

{'CECCgov': 3,
 'ANCALERTS': 2,
 'GradaNorteMX': 1,
 'IMSS_SanLuis': 1,
 'Milenio': 1,
 'NewsweekEspanol': 1,
 'tvnnoticias': 1,
 'TUDNUSA': 1,
 'SSalud_mx': 1,
 'AlBayanNews': 1,
 'americonsumer': 1,
 'Haines4Laura': 1,
 'temyboy': 1,
 'leblanc_isa': 1}

In [40]:
# Output into
output_file = "myout.csv"
csv_file = open(output_file, "w")
for acc, follower_delta in verified_delta.items():
    active = (acc in active_counter)
    csv_file.write(acc+","+str(follower_delta)+","+str(int(active))+"\n")
csv_file.close()

## Using queries

In [97]:
# Find the 1000 most active users
actives = (
    df \
        .groupby("screen_name") \
        .count() \
        .orderBy(col("count").desc()) \
        .limit(1000) \
)
actives.show()

+---------------+-----+
|    screen_name|count|
+---------------+-----+
|        CECCgov|    3|
|      ANCALERTS|    2|
|      SSalud_mx|    1|
|        Milenio|    1|
|   IMSS_SanLuis|    1|
|    AlBayanNews|    1|
|  americonsumer|    1|
|   GradaNorteMX|    1|
|NewsweekEspanol|    1|
|        TUDNUSA|    1|
|    tvnnoticias|    1|
|   Haines4Laura|    1|
|        temyboy|    1|
|    leblanc_isa|    1|
+---------------+-----+



In [93]:
# How many followers at the start of dataset, for verified accounts
fol_start = \
(
    df
        .filter(col("Verified") == 'TRUE')
        .withColumn("AB", F.struct("created_at", "followers_count"))
        .groupby("screen_name")
        # F.max(AB) selects AB-combinations with max `A`. If more
        # than one combination remains the one with max `B` is selected. If
        # after this identical combinations remain, a single one of them is picked
        # randomly.
        .agg(F.min("AB").alias("max_AB"))
        .select("screen_name", "max_AB.created_at", F.expr("max_AB.followers_count"))
        
)
fol_start.show()

+---------------+-------------------+---------------+
|    screen_name|         created_at|followers_count|
+---------------+-------------------+---------------+
|      ANCALERTS|2020-03-30 21:00:00|        4873740|
|    AlBayanNews|2020-03-30 21:00:00|         754538|
|        CECCgov|2020-03-31 20:59:59|          34367|
|NewsweekEspanol|2020-03-30 21:00:00|          87295|
|      SSalud_mx|2020-03-30 21:00:00|         838824|
|        TUDNUSA|2020-03-30 21:00:00|         632332|
|    tvnnoticias|2020-03-30 21:00:00|         809756|
+---------------+-------------------+---------------+



In [94]:
# How many followers at the end of dataset, for verified accounts
fol_end = \
(
    df
        .filter(col("Verified") == 'TRUE')
        .withColumn("AB", F.struct("created_at", "followers_count"))
        .groupby("screen_name")
        # F.max(AB) selects AB-combinations with max `A`. If more
        # than one combination remains the one with max `B` is selected. If
        # after this identical combinations remain, a single one of them is picked
        # randomly.
        .agg(F.max("AB").alias("max_AB"))
        .select("screen_name", "max_AB.created_at", F.expr("max_AB.followers_count"))
        
)
fol_end.show()

+---------------+-------------------+---------------+
|    screen_name|         created_at|followers_count|
+---------------+-------------------+---------------+
|      ANCALERTS|2020-03-30 21:00:00|        4873740|
|    AlBayanNews|2020-03-30 21:00:00|         754538|
|        CECCgov|2020-04-25 20:59:59|          41367|
|NewsweekEspanol|2020-03-30 21:00:00|          87295|
|      SSalud_mx|2020-03-30 21:00:00|         838824|
|        TUDNUSA|2020-03-30 21:00:00|         632332|
|    tvnnoticias|2020-03-30 21:00:00|         809756|
+---------------+-------------------+---------------+



In [95]:
# Join the two
joined = fol_end \
    .select("screen_name",fol_end.followers_count.alias("end_count")) \
    .join(fol_start, on="screen_name") 
joined.show()

+---------------+---------+-------------------+---------------+
|    screen_name|end_count|         created_at|followers_count|
+---------------+---------+-------------------+---------------+
|      ANCALERTS|  4873740|2020-03-30 21:00:00|        4873740|
|    AlBayanNews|   754538|2020-03-30 21:00:00|         754538|
|        CECCgov|    41367|2020-03-31 20:59:59|          34367|
|NewsweekEspanol|    87295|2020-03-30 21:00:00|          87295|
|      SSalud_mx|   838824|2020-03-30 21:00:00|         838824|
|        TUDNUSA|   632332|2020-03-30 21:00:00|         632332|
|    tvnnoticias|   809756|2020-03-30 21:00:00|         809756|
+---------------+---------+-------------------+---------------+



In [96]:
# How many followers gained the (top 1000 verified users)
delta_df  = (
    joined \
        .withColumn('Result', ( joined["end_count"] - joined["followers_count"] ) ) \
        .select("screen_name","Result") \
        .sort(desc("Result")) \
        .limit(1000)
        .withColumn("Result", col("Result").cast("int"))
)
delta_df.show()

+---------------+------+
|    screen_name|Result|
+---------------+------+
|        CECCgov|  7000|
|      ANCALERTS|     0|
|    AlBayanNews|     0|
|NewsweekEspanol|     0|
|      SSalud_mx|     0|
|        TUDNUSA|     0|
|    tvnnoticias|     0|
+---------------+------+



In [99]:
# Find the 1000 most active users
actives = (
    df \
        .groupby("screen_name") \
        .count() \
        .orderBy(col("count").desc()) \
        .limit(3) \
)
actives.show()

+-----------+-----+
|screen_name|count|
+-----------+-----+
|    CECCgov|    3|
|  ANCALERTS|    2|
|  SSalud_mx|    1|
+-----------+-----+



In [116]:
result = (
delta_df \
    .join(actives, on="screen_name", how="left") \
    .select("screen_name", col("Result").alias("Foll. Gained"), when(col("count").isNotNull(), 1).otherwise(0).alias("IsActive")) \
)
result.show()

+---------------+------------+--------+
|    screen_name|Foll. Gained|IsActive|
+---------------+------------+--------+
|        CECCgov|        7000|       1|
|      ANCALERTS|           0|       1|
|    AlBayanNews|           0|       0|
|NewsweekEspanol|           0|       0|
|      SSalud_mx|           0|       1|
|        TUDNUSA|           0|       0|
|    tvnnoticias|           0|       0|
+---------------+------------+--------+



In [119]:
result.coalesce(1).write.csv("myout3.csv")

In [41]:
# Find the 1000 most active *verified* users
df \
    .filter("verified == 'TRUE'") \
    .groupby("screen_name") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(1000) \
    .show()

+---------------+-----+
|    screen_name|count|
+---------------+-----+
|        CECCgov|    3|
|      ANCALERTS|    2|
|      SSalud_mx|    1|
|    AlBayanNews|    1|
|NewsweekEspanol|    1|
|        TUDNUSA|    1|
|    tvnnoticias|    1|
+---------------+-----+



In [47]:
# Last tweet of that account in the dataset
df  \
    .groupBy("screen_name") \
    .agg(max_("created_at"), first("followers_count")) \
    .show()

+---------------+-------------------+----------------------+
|    screen_name|    max(created_at)|first(followers_count)|
+---------------+-------------------+----------------------+
|      ANCALERTS|2020-03-30 21:00:00|               4873740|
|    AlBayanNews|2020-03-30 21:00:00|                754538|
|        CECCgov|2020-04-25 20:59:59|                 34367|
|   GradaNorteMX|2020-03-30 21:00:00|                  1846|
|   Haines4Laura|2020-03-31 20:59:59|                  1125|
|   IMSS_SanLuis|2020-03-30 21:00:00|                  1012|
|        Milenio|2020-03-30 21:00:00|                  null|
|NewsweekEspanol|2020-03-30 21:00:00|                 87295|
|      SSalud_mx|2020-03-30 21:00:00|                838824|
|        TUDNUSA|2020-03-30 21:00:00|                632332|
|  americonsumer|2020-03-31 20:59:59|                  3176|
|    leblanc_isa|2020-03-31 20:59:59|                   953|
|        temyboy|2020-03-31 20:59:59|                  8229|
|    tvnnoticias|2020-03

In [19]:
# First tweet of that account in the dataset
df \
    .groupBy("screen_name") \
    .agg(min_("created_at")) \
    .show()

+---------------+-------------------+
|    screen_name|    min(created_at)|
+---------------+-------------------+
|      SSalud_mx|2020-03-30 21:00:00|
|      ANCALERTS|2020-03-30 21:00:00|
|        Milenio|2020-03-30 21:00:00|
|   IMSS_SanLuis|2020-03-30 21:00:00|
|    AlBayanNews|2020-03-30 21:00:00|
|  americonsumer|2020-03-31 20:59:59|
|   GradaNorteMX|2020-03-30 21:00:00|
|NewsweekEspanol|2020-03-30 21:00:00|
|        TUDNUSA|2020-03-30 21:00:00|
|    tvnnoticias|2020-03-30 21:00:00|
|   Haines4Laura|2020-03-31 20:59:59|
|        temyboy|2020-03-31 20:59:59|
|    leblanc_isa|2020-03-31 20:59:59|
|        CECCgov|2020-03-31 20:59:59|
+---------------+-------------------+



In [27]:
# First tweet of that account in the dataset
df \
    .groupBy("screen_name") \
    .agg(min_("created_at"), min_("followers_count")) \
    .show()

+---------------+-------------------+--------------------+
|    screen_name|    min(created_at)|min(followers_count)|
+---------------+-------------------+--------------------+
|      ANCALERTS|2020-03-30 21:00:00|             4873740|
|    AlBayanNews|2020-03-30 21:00:00|              754538|
|        CECCgov|2020-03-31 20:59:59|               34367|
|   GradaNorteMX|2020-03-30 21:00:00|                1846|
|   Haines4Laura|2020-03-31 20:59:59|                1125|
|   IMSS_SanLuis|2020-03-30 21:00:00|                1012|
|        Milenio|2020-03-30 21:00:00|                null|
|NewsweekEspanol|2020-03-30 21:00:00|               87295|
|      SSalud_mx|2020-03-30 21:00:00|              838824|
|        TUDNUSA|2020-03-30 21:00:00|              632332|
|  americonsumer|2020-03-31 20:59:59|                3176|
|    leblanc_isa|2020-03-31 20:59:59|                 953|
|        temyboy|2020-03-31 20:59:59|                8229|
|    tvnnoticias|2020-03-30 21:00:00|              80975

In [193]:
df2
    .with

.groupBy('screen_name').agg(f.max_('created_at')).show()

AttributeError: 'str' object has no attribute 'max_'

In [183]:
from pyspark.sql import Window

In [185]:
w = Window.partitionBy('screen_name')
df2.withColumn('maxB', f.max('created_at').over(w))\
    .where(f.col('created_at') == f.col('maxB'))\
    .drop('maxB')\
    .show()

AttributeError: 'str' object has no attribute 'max'

In [182]:
df \
    .select(min("created_at")) \
    .filter("verified == 'TRUE'") \
    .groupby("screen_name") \
    .show()

AnalysisException: cannot resolve '_' given input columns: [account_created_at, account_lang, country_code, created_at, favourites_count, followers_count, friends_count, is_quote, is_retweet, lang, place_full_name, place_type, reply_to_status_id, retweet_count, screen_name, source, status_id, text, user_id, verified];
'Project ['_]
+- Relation [account_created_at#313,account_lang#314,country_code#315,created_at#316,favourites_count#317,followers_count#318,friends_count#319,is_quote#320,is_retweet#321,lang#322,place_full_name#323,place_type#324,reply_to_status_id#325,retweet_count#326,screen_name#327,source#328,status_id#329,text#330,user_id#331,verified#332] json


# OFF

In [4]:
spark1 = SparkSession.builder.appName("Ops").getOrCreate()

In [2]:
import pyspark
import random
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):     
    x, y = random.random(), random.random()
    return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

3.14143748


In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("HelloLines").getOrCreate()

In [52]:
# rdd = sc.textFile("hdfs:/user/cunha/hello.txt")
sc = spark.sparkContext
rdd = sc.textFile("part0.json")

lines = rdd.count()
outrdd = sc.parallelize([lines])

In [49]:
df = spark.read.json(["part0.json","part1.json", "part2.json"])

In [50]:
df.columns

['account_created_at',
 'account_lang',
 'country_code',
 'created_at',
 'favourites_count',
 'followers_count',
 'friends_count',
 'is_quote',
 'is_retweet',
 'lang',
 'place_full_name',
 'place_type',
 'reply_to_status_id',
 'retweet_count',
 'screen_name',
 'source',
 'status_id',
 'text',
 'user_id',
 'verified']

In [51]:
df.created_at

Column<'created_at'>

In [52]:
# Find the verified accounts
verifieds = df.filter(df.verified == 'TRUE')
verifieds.count()

9

In [53]:
import time
import datetime

In [54]:
date_time_str = '2020-03-31T00:00:00Z'
date_time_obj1 = datetime.datetime.strptime(date_time_str, '%Y-%m-%dT%H:%M:%SZ')
date_time_obj1

datetime.datetime(2020, 3, 31, 0, 0)

In [55]:
date_time_str = '2020-04-20T00:00:00Z'
date_time_obj2 = datetime.datetime.strptime(date_time_str, '%Y-%m-%dT%H:%M:%SZ')
date_time_obj2

datetime.datetime(2020, 4, 20, 0, 0)

In [56]:
date_time_obj1 < 0

TypeError: '<' not supported between instances of 'datetime.datetime' and 'int'

In [21]:
type(date_time_obj)

datetime.datetime

In [38]:
int(verifieds.collect()[0].followers_count)

87295

In [12]:
verified_followers = {}

In [44]:
datetime.datetime.min

datetime.datetime(1, 1, 1, 0, 0)

In [45]:
datetime.datetime.max

datetime.datetime(9999, 12, 31, 23, 59, 59, 999999)

In [57]:
from collections import defaultdict

verified_followers_min = defaultdict(lambda : (datetime.datetime.max,0))
verified_followers_max = defaultdict(lambda : (datetime.datetime.min,0))

In [58]:
for acc in verifieds.collect():
    name = acc.screen_name
    followers = int(acc.followers_count)
    tweet_date = datetime.datetime.strptime(acc.created_at, '%Y-%m-%dT%H:%M:%SZ')
    
    if (tweet_date < verified_followers_min[name][0]):
        verified_followers_min[name] = (tweet_date,followers)
    if (tweet_date > verified_followers_max[name][0]):
        verified_followers_max[name] = (tweet_date,followers)
    print(acc.screen_name)

NewsweekEspanol
tvnnoticias
TUDNUSA
ANCALERTS
ANCALERTS
SSalud_mx
AlBayanNews
CECCgov
CECCgov


In [59]:
verified_followers_min

defaultdict(<function __main__.<lambda>()>,
            {'NewsweekEspanol': (datetime.datetime(2020, 3, 31, 0, 0), 87295),
             'tvnnoticias': (datetime.datetime(2020, 3, 31, 0, 0), 809756),
             'TUDNUSA': (datetime.datetime(2020, 3, 31, 0, 0), 632332),
             'ANCALERTS': (datetime.datetime(2020, 3, 31, 0, 0), 4873740),
             'SSalud_mx': (datetime.datetime(2020, 3, 31, 0, 0), 838824),
             'AlBayanNews': (datetime.datetime(2020, 3, 31, 0, 0), 754538),
             'CECCgov': (datetime.datetime(2020, 3, 31, 23, 59, 59), 34367)})

In [60]:
verified_followers_max

defaultdict(<function __main__.<lambda>()>,
            {'NewsweekEspanol': (datetime.datetime(2020, 3, 31, 0, 0), 87295),
             'tvnnoticias': (datetime.datetime(2020, 3, 31, 0, 0), 809756),
             'TUDNUSA': (datetime.datetime(2020, 3, 31, 0, 0), 632332),
             'ANCALERTS': (datetime.datetime(2020, 3, 31, 0, 0), 4873740),
             'SSalud_mx': (datetime.datetime(2020, 3, 31, 0, 0), 838824),
             'AlBayanNews': (datetime.datetime(2020, 3, 31, 0, 0), 754538),
             'CECCgov': (datetime.datetime(2020, 4, 20, 23, 59, 59), 99367)})

In [63]:
verified_delta = {}
for key,(_, count_final) in verified_followers_max.items():
    count_initial = verified_followers_min[key][1]
    verified_delta[key] = count_final-count_initial

In [64]:
verified_delta

{'NewsweekEspanol': 0,
 'tvnnoticias': 0,
 'TUDNUSA': 0,
 'ANCALERTS': 0,
 'SSalud_mx': 0,
 'AlBayanNews': 0,
 'CECCgov': 65000}

In [51]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df2 = spark.createDataFrame(data=data2,schema=schema)
df2.printSchema()
df2.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [43]:
df.count()

12

In [36]:
df.collect()

[Row(account_created_at='2014-08-10T21:20:32Z', account_lang=None, country_code=None, created_at='2020-03-31T00:00:00Z', favourites_count='1472', followers_count='1846', friends_count='252', is_quote='FALSE', is_retweet='FALSE', lang='es', reply_to_status_id=None, retweet_count='0', screen_name='GradaNorteMX', source='TweetDeck', status_id='1244776423073542144', text='Cuando mejor iban las cosas en el circuito de tenis universitario de Estados Unidos, el #sonorense Alán Rubio volvió a Hermosillo ante la pandemia del #coronavirus 😕🤦🏽\u200d♂️🇲🇽🇺🇸  🎾 https://t.co/SldPvrP81A https://t.co/7x8W8MwQuR', user_id='2722502906', verified='FALSE'),
 Row(account_created_at='2017-05-04T22:00:38Z', account_lang=None, country_code=None, created_at='2020-03-31T00:00:00Z', favourites_count='300', followers_count='1012', friends_count='41', is_quote='FALSE', is_retweet='FALSE', lang='es', reply_to_status_id=None, retweet_count='0', screen_name='IMSS_SanLuis', source='TweetDeck', status_id='12447764235937

In [None]:
list_of_jsons = rdd.collect()

In [27]:
len(rdd.collect())

12

In [20]:
lines

12

In [None]:
# The following will fail if the output directory exists:
# outrdd.saveAsTextFile("hdfs:/user/cunha/hello-linecount-submit")
outrdd.saveAsTextFile("read02")

In [None]:
sc.stop()