In [472]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F 

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Test") \
    .config("spark.sql.shuffle.partitions", 6) \
    .getOrCreate()


## 1. Считывание файла с твитами и преобразование его в винтрину

In [473]:
json_tweets_df = spark.read.json("three_minutes_tweets.json")

In [475]:
tweets_mart_df = json_tweets_df.select(
    json_tweets_df.user.screen_name.alias("screen_name"), 
    F.coalesce(json_tweets_df.text, F.lit("")).alias("tweet_text"), 
    F.from_unixtime(json_tweets_df.timestamp_ms / 1000).alias("created_at"), 
    json_tweets_df.place.country_code.alias("country_code"), 
    json_tweets_df.place.full_name.alias("location"),
    json_tweets_df.lang,
    json_tweets_df.id_str.alias("tweet_id"),
    json_tweets_df.user.profile_image_url.alias("display_url")
).cache()
del json_tweets_df

In [476]:
tweets_mart_df.limit(10).toPandas()

Unnamed: 0,screen_name,tweet_text,created_at,country_code,location,lang,tweet_id,display_url
0,,,,,,,,
1,snoow3333,ايه الأكل 😜,2015-08-16 21:41:13,,,ar,6.330307796190126e+17,http://pbs.twimg.com/profile_images/6153385524...
2,balwinderstyles,RT @nitishuna: @LasVegasChicas @I_luv_reds @Di...,2015-08-16 21:41:13,,,und,6.33030779610665e+17,http://pbs.twimg.com/profile_images/5995891627...
3,eqtybas,إنّ العرب إذا تغلبوا على أوطان أسرع إليها الخر...,2015-08-16 21:41:13,,,ar,6.330307796315668e+17,http://pbs.twimg.com/profile_images/6154427936...
4,Anaalzeenn,RT @moonmona7: لن الومك على رِحيلك ...\nفَـقٌـ...,2015-08-16 21:41:13,,,ar,6.330307796064666e+17,http://pbs.twimg.com/profile_images/5126110529...
5,BraydenKade,@16_Pirates any shots?,2015-08-16 21:41:13,,,en,6.330307796232069e+17,http://pbs.twimg.com/profile_images/6225586675...
6,Halima1_,RT @Cancer_gk: A #Cancer may act shy and quiet...,2015-08-16 21:41:13,,,en,6.330307796022436e+17,http://pbs.twimg.com/profile_images/6203703546...
7,jazzinique2015,@1voodoochild thanks for the follow💯,2015-08-16 21:41:13,,,en,6.33030779627434e+17,http://pbs.twimg.com/profile_images/6100753306...
8,ComeOnBunny,"""shitting on otherkin is ablist and transphobi...",2015-08-16 21:41:13,,,en,6.330307796400906e+17,http://pbs.twimg.com/profile_images/6266704114...
9,xspearx330,112 #MTVHottest Britney Spears,2015-08-16 21:41:13,,,en,6.330307796233748e+17,http://abs.twimg.com/sticky/default_profile_im...


## 2. Считывание файла AFINN-111.txt в датафрейм

In [477]:
from pyspark.sql.types import StructType, StringType, StructField, IntegerType

afinn_schema = StructType([
    StructField("word", StringType()),
    StructField("sentiment", IntegerType())
])

afinn_df = spark \
    .read \
    .option("delimiter", '\t') \
    .schema(afinn_schema) \
    .csv("AFINN-111.txt")

afinn_df.cache()

DataFrame[word: string, sentiment: int]

In [478]:
afinn_df.printSchema()

root
 |-- word: string (nullable = true)
 |-- sentiment: integer (nullable = true)



In [479]:
afinn_df.limit(10).toPandas()

Unnamed: 0,word,sentiment
0,abandon,-2
1,abandoned,-2
2,abandons,-2
3,abducted,-2
4,abduction,-2
5,abductions,-2
6,abhor,-3
7,abhorred,-3
8,abhorrent,-3
9,abhors,-3


## 3. Подсчет сентиментов для каждого твита

In [480]:
tweets_sentiments_df = tweets_mart_df \
    .withColumn(
        'words_list',
        # оставляем только слова и теги юзеров
        F.split(F.regexp_replace(tweets_mart_df.tweet_text, '[^@\w]', " "), " ")) \
    .select(tweets_mart_df.tweet_id,
           tweets_mart_df.tweet_text,
           tweets_mart_df.screen_name,
           tweets_mart_df.created_at,
           tweets_mart_df.country_code,
           tweets_mart_df.location,
           tweets_mart_df.lang,
           tweets_mart_df.display_url,
           F.col('words_list'),
           F.explode(F.col("words_list")).alias("word")) \
    .join(F.broadcast(afinn_df), "word", how="outer",) \
    .groupBy(tweets_mart_df.tweet_id,
             tweets_mart_df.tweet_text,
             tweets_mart_df.screen_name,
             tweets_mart_df.created_at,
             tweets_mart_df.country_code,
             tweets_mart_df.location,
             tweets_mart_df.lang,
             tweets_mart_df.display_url,
             F.col('words_list')) \
    .agg(F.sum("sentiment").alias('sentiment')) \
    .fillna(0) \
    .cache()

In [481]:
tweets_sentiments_df.limit(10).toPandas()

Unnamed: 0,tweet_id,tweet_text,screen_name,created_at,country_code,location,lang,display_url,words_list,sentiment
0,633031266162577408,(لا جرم أن الله يعلم ما يسرون وما يعلنون إنه ل...,noof123321,2015-08-16 21:43:09,,,ar,http://pbs.twimg.com/profile_images/5278805682...,"[, , , , , , , , , , , , , , , , , , , , , , ,...",0
1,633030779640090624,"""shitting on otherkin is ablist and transphobi...",ComeOnBunny,2015-08-16 21:41:13,,,en,http://pbs.twimg.com/profile_images/6266704114...,"[, shitting, on, otherkin, is, ablist, and, tr...",-4
2,633031266150055936,Трамп предложил повысить цены на визы для мекс...,ihnotksq,2015-08-16 21:43:09,,,ru,http://abs.twimg.com/sticky/default_profile_im...,"[, , , , , , , , , , , , , , , , , , , , , , ,...",0
3,633030779619020800,これ、これ！！！\nまじで50万楽勝だったwww\n 【http://t.co/Q6oJ3...,obuse666,2015-08-16 21:41:13,,,ja,http://abs.twimg.com/sticky/default_profile_im...,"[, , , , , , , , , , , , 50, , , , , , www, , ...",0
4,633030779610640384,@tenu_san 実習です寝れません…,rnyk220,2015-08-16 21:41:13,,,ja,http://pbs.twimg.com/profile_images/6316983580...,"[@tenu_san, , , , , , , , , , , ]",0
5,633030779623358464,amk filmi beynimin içine etti ya düşünemiyorum...,sakinlesseda,2015-08-16 21:41:13,,,tr,http://pbs.twimg.com/profile_images/6312493693...,"[amk, filmi, beynimin, i, ine, etti, ya, d, , ...",0
6,633030779619147776,RT @PerfectBaits: http://t.co/9Y8VAB9Gbq,soyunfelinomiau,2015-08-16 21:41:13,,,und,http://pbs.twimg.com/profile_images/6177188387...,"[RT, @PerfectBaits, , http, , , t, co, 9Y8VAB9...",0
7,633030779627438080,ふぅ。ねみ,lawton_mm,2015-08-16 21:41:13,,,ja,http://pbs.twimg.com/profile_images/4907593240...,"[, , , , , ]",0
8,633031270365204480,Estamos repasando la carrera de Gustavo Cerati...,ROCKANROLACBUS,2015-08-16 21:43:10,,,es,http://pbs.twimg.com/profile_images/6300097225...,"[Estamos, repasando, la, carrera, de, Gustavo,...",0
9,633030779619119104,RT @PolloVignolo: Coincido plenamente señor ga...,JustCabj,2015-08-16 21:41:13,,,es,http://pbs.twimg.com/profile_images/6284335557...,"[RT, @PolloVignolo, , Coincido, plenamente, se...",0


## 4. Pyspark запросы (6 штук), которые выводят по 5 наиболее и наименее счастливых стран, локаций и пользователей, а также среднюю эмоциональную окраску их твитов

#### 4.1 Наиболее счастивые страны

In [482]:
tweets_sentiments_df \
    .select(tweets_sentiments_df.country_code,
            tweets_sentiments_df.sentiment) \
    .where(tweets_sentiments_df.country_code.isNotNull()) \
    .groupBy(tweets_sentiments_df.country_code) \
    .agg(F.avg(tweets_sentiments_df.sentiment).alias('avg_sentiment')) \
    .orderBy('avg_sentiment', ascending=False) \
    .limit(5) \
    .toPandas()

Unnamed: 0,country_code,avg_sentiment
0,IE,5.0
1,PL,4.5
2,DK,4.0
3,DE,0.666667
4,US,0.52


#### 4.2 Наименее счастливые страны

In [483]:
tweets_sentiments_df \
    .select(tweets_sentiments_df.country_code,
            tweets_sentiments_df.sentiment) \
    .where(tweets_sentiments_df.country_code.isNotNull()) \
    .groupBy(tweets_sentiments_df.country_code) \
    .agg(F.avg(tweets_sentiments_df.sentiment).alias('avg_sentiment')) \
    .orderBy('avg_sentiment') \
    .limit(5) \
    .toPandas()

Unnamed: 0,country_code,avg_sentiment
0,PH,-3.0
1,PT,-1.25
2,VE,-0.5
3,AR,-0.4
4,CL,-0.25


#### 4.3 Наиболее счастливые локации

In [484]:
tweets_sentiments_df \
    .select(tweets_sentiments_df.location,
            tweets_sentiments_df.sentiment) \
    .where(tweets_sentiments_df.location.isNotNull()) \
    .groupBy(tweets_sentiments_df.location) \
    .agg(F.avg(tweets_sentiments_df.sentiment).alias('avg_sentiment')) \
    .orderBy('avg_sentiment', ascending=False) \
    .limit(5) \
    .toPandas()

Unnamed: 0,location,avg_sentiment
0,Polska,9.0
1,"Los Angeles, CA",5.5
2,"Clare, Ireland",5.0
3,"Lincoln, NE",4.0
4,"Ringkøbing-Skjern, Midtjylland",4.0


#### 4.4 Наименее счастливые локации

In [485]:
tweets_sentiments_df \
    .select(tweets_sentiments_df.location,
            tweets_sentiments_df.sentiment) \
    .where(tweets_sentiments_df.location.isNotNull()) \
    .groupBy(tweets_sentiments_df.location) \
    .agg(F.avg(tweets_sentiments_df.sentiment).alias('avg_sentiment')) \
    .orderBy('avg_sentiment') \
    .limit(5) \
    .toPandas()

Unnamed: 0,location,avg_sentiment
0,"Odivelas, Portugal",-5.0
1,"Oakland, CA",-5.0
2,"Hillingdon, London",-4.0
3,"Taguig City, National Capital Region",-3.0
4,"Rendon, TX",-2.0


#### 4.5 Наиболее счасливые пользователи

In [486]:
tweets_sentiments_df \
    .select(tweets_sentiments_df.screen_name,
            tweets_sentiments_df.sentiment) \
    .where(tweets_sentiments_df.screen_name.isNotNull()) \
    .groupBy(tweets_sentiments_df.screen_name) \
    .agg(F.avg(tweets_sentiments_df.sentiment).alias('avg_sentiment')) \
    .orderBy('avg_sentiment', ascending=False) \
    .limit(5) \
    .toPandas()

Unnamed: 0,screen_name,avg_sentiment
0,94skinnhes,17.0
1,lisacortezza,13.0
2,ChuckieCarrier,12.0
3,laurahowell71,11.0
4,ryanjilka72,9.0


#### 4.6 Наименее счастливые пользователи

In [487]:
tweets_sentiments_df \
    .select(tweets_sentiments_df.screen_name,
            tweets_sentiments_df.sentiment) \
    .where(tweets_sentiments_df.screen_name.isNotNull()) \
    .groupBy(tweets_sentiments_df.screen_name) \
    .agg(F.avg(tweets_sentiments_df.sentiment).alias('avg_sentiment')) \
    .orderBy('avg_sentiment') \
    .limit(5) \
    .toPandas()

Unnamed: 0,screen_name,avg_sentiment
0,SpacceeOutt,-10.0
1,JGoldfarb,-9.0
2,BasedGodJose,-9.0
3,DestiniTeyonna,-9.0
4,Xenomorphica,-9.0


## 5. Наиболее счастливый пользователь, которого упоминали в твитах другие пользователи

In [488]:
tweets_sentiments_df \
    .withColumn('user_tag', F.concat(F.lit('@'), tweets_sentiments_df.screen_name)) \
    .withColumn('mentioned_user', F.explode('words_list')) \
    .where(F.col('mentioned_user').startswith(F.lit('@'))) \
    .where('user_tag != mentioned_user') \
    .select('mentioned_user').distinct() \
    .join(
        tweets_sentiments_df.withColumn('mentioned_user', F.concat(F.lit('@'), tweets_sentiments_df.screen_name)),
        'mentioned_user'
    ) \
    .select('screen_name', 'sentiment') \
    .orderBy('sentiment', ascending=False) \
    .limit(1) \
    .toPandas()


Unnamed: 0,screen_name,sentiment
0,alexmaxam,3


In [489]:
afinn_df.unpersist()
tweets_mart_df.unpersist()
tweets_sentiments_df.unpersist()
spark.stop()

## 6. ETL процесс

1. Забираем сырые данные и грузим на стейдж, например в hdfs.
2. Запускаем spark jobs для трансформации и очистки данных.
3. Валидируем новые данные.
3. Грузим новые данные в хранилище, например в Hive.
4. Обновляем вьюхи и визуалицаии для конечых пользователей.