In [1]:
from pyspark.sql import SparkSession
#from pyspark.sql.functions import *
from pyspark.sql import functions as F
#from googletrans import Translator
from pyspark.sql.window import Window
from pyspark.sql.types import TimestampType
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
#from mpl_toolkits.basemap import Basemap
import time
    
spark = SparkSession \
    .builder \
    .appName("Large scale human mobility data analysis through social media") \
    .config("spark.sql.broadcastTimeout", "36000") \
    .getOrCreate()
    
twitter_df = spark.read.load("/data/twitterdata/twitter2018*.parquet", format="parquet")
twitter_df = twitter_df.dropDuplicates(['media_id'])
twitter_df.createOrReplaceTempView("twitter")
twitter_df = spark.sql("select * from twitter where user_id in (select distinct user_id from twitter where country_code in ('BE','BG','CZ','DK','DE','EE','IE','EL','ES','FR','HR','IT','CY','LV','LT','LU','HU','MT','NL','AT','PL','PT','RO','SI','SK','FI','SE','UK','IS','LI','NO','CH','ME','MK','AL','RS','TR','XK','BA'))")
twitter_df.agg(F.countDistinct("media_id")).show()
twitter_df.agg(F.countDistinct("user_id")).show()

+------------------------+
|count(DISTINCT media_id)|
+------------------------+
|                27093348|
+------------------------+

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                1246295|
+-----------------------+



In [None]:
twitter_geotagged = twitter_df.filter(twitter_df['lat'].isNotNull())

+------------------------+
|count(DISTINCT media_id)|
+------------------------+
|                 5540371|
+------------------------+



In [5]:
5540371/27093348

0.20449192916283362

In [2]:
instagram_user_df = spark.read.load("/data/instagramdata/instagram_user2018*.parquet", format="parquet")
instagram_media1 = spark.read.load("/data/instagramdata/instagram_media2018*.parquet", format="parquet")
instagram_media2 = spark.read.load("/data/instagramdata1/instagram_media2018*.parquet", format="parquet")
instagram_media_df = instagram_media1.union(instagram_media2)
instagram_location_df = spark.read.load("/data/instagramdata/instagram_location2018_04*.parquet", format="parquet")

user_media_df = instagram_user_df.join(instagram_media_df,instagram_user_df["user_id"] == instagram_media_df["user_id"]).drop(instagram_media_df["user_id"])
instagram_df = user_media_df.join(instagram_location_df,user_media_df["location_id"] == instagram_location_df["location_id"]).drop(instagram_location_df["location_id"])
instagram_df = instagram_df.dropDuplicates(['media_id'])
instagram_df.createOrReplaceTempView("instagram")
instagram_df = spark.sql("select * from instagram where user_id in (select distinct user_id from instagram where country_id in ('BE','BG','CZ','DK','DE','EE','IE','EL','ES','FR','HR','IT','CY','LV','LT','LU','HU','MT','NL','AT','PL','PT','RO','SI','SK','FI','SE','UK','IS','LI','NO','CH','ME','MK','AL','RS','TR','XK','BA'))")
#instagram_df.write.save("/data/instagramdata/instagram10_11.parquet",format="parquet",mode='overwrite')
instagram_df.agg(F.countDistinct("media_id")).show()
instagram_df.agg(F.countDistinct("user_id")).show()

+------------------------+
|count(DISTINCT media_id)|
+------------------------+
|                  452613|
+------------------------+

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                  23805|
+-----------------------+



In [None]:
# great-circle distance calculation function
# def dist(long_x, lat_x, long_y, lat_y):
#    return F.acos(
#        F.sin(F.radians(lat_x)) * F.sin(F.radians(lat_y)) +
#        F.cos(F.radians(lat_x)) * F.cos(F.radians(lat_y)) *
#            F.cos(F.radians(long_x) - F.radians(long_y))
#    ) * F.lit(6371.0)

# Haversine formula distance calculation function
def dist(lon1, lat1, lon2, lat2):
    radius = 6371  # km
    dlat = F.radians(lat2 - lat1)
    dlon = F.radians(lon2 - lon1)
    a = F.sin(dlat / 2) * F.sin(dlat / 2) + F.cos(F.radians(lat1)) \
        * F.cos(F.radians(lat2)) * F.sin(dlon / 2) * F.sin(dlon / 2)
    c = 2 * F.atan2(F.sqrt(a), F.sqrt(1 - a))
    d = radius * c
    return d

In [None]:
#plot interarrivaltime and interdistance and filter tweet whose speed<= 1000 km/h

changeDateStructure_udf = F.udf(
    lambda x: time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(x, '%a %b %d %H:%M:%S +0000 %Y')))
twitter_df = twitter_df.withColumn("create_time",
                                   F.to_timestamp(changeDateStructure_udf("create_time"), 'yyyy-MM-dd HH:mm:ss').alias(
                                       "create_time"))

# calculate interval distance between consecutive create_time
w = Window().partitionBy(twitter_df["user_id"]).orderBy(twitter_df["create_time"])

#distance is NaA when using great-circle distance calculation to calculate two equal coordinates
# geodistance_df = twitter_df.withColumn("dist", (when((twitter_df["place_lon"]==lag(twitter_df["place_lon"], 1).over(w))&(twitter_df["place_lat"]==lag(twitter_df["place_lat"],1).over(w)),lit(0.0)).otherwise(dist(
#    twitter_df["place_lon"], twitter_df["place_lat"],
#    lag(twitter_df["place_lon"], 1).over(w), lag(twitter_df["place_lat"], 1).over(w)
# ))).alias("dist"))

twitter_df = twitter_df.withColumn("dist", dist(
    twitter_df["place_lon"], twitter_df["place_lat"],
    F.lag(twitter_df["place_lon"], 1).over(w), F.lag(twitter_df["place_lat"], 1).over(w)
).alias("dist"))
# calculate timeDiff by day
# w = Window().partitionBy(geodistance_df["user_id"]).orderBy(geodistance_df["create_time"])
# time_space_df = geodistance_df.withColumn("interarrivaltime", datediff(geodistance_df["create_time"],lag(geodistance_df["create_time"], 1).over(w)).alias("interarrivaltime"))

# calulate timeDiff by second
twitter_df = twitter_df.withColumn("interarrivaltime", (
        F.unix_timestamp(twitter_df["create_time"]) - F.unix_timestamp(
    F.lag(twitter_df["create_time"], 1).over(w))).alias("interarrivaltime"))

#time_space_df = time_space_df.filter(time_space_df["interarrivaltime"].isNotNull())
twitter_df = twitter_df.filter((twitter_df['interarrivaltime'] != 0)|(twitter_df['interarrivaltime'].isNull()))
twitter_df = twitter_df.withColumn("speed",(twitter_df['dist'] * 3600 / twitter_df['interarrivaltime']).alias("speed"))
twitter_df = twitter_df.filter((twitter_df["speed"]<= 1000)|(twitter_df["speed"].isNull()))

#twitter_speed_exclusion = twitter_df.select(twitter_df['user_id'],twitter_df['speed'],twitter_df['media_id'],F.lag(twitter_df["media_id"], 1).over(w).alias('p_media_id')).filter(F.col('speed')>1000)
#twitter_speed_exclusion = twitter_speed_exclusion.select(['media_id']).union(twitter_speed_exclusion.select(['p_media_id']))
#witter_speed_exclusion.createOrReplaceTempView("twitter_speed_exclusion")
#twitter_df = spark.sql("select * from twitter where media_id not in (select media_id from twitter_speed_exclusion)")

twitter_df.agg(F.countDistinct("media_id")).show()
twitter_df.agg(F.countDistinct("user_id")).show()

In [3]:
instagram_df = instagram_df.withColumn('create_time',instagram_df['create_time_stamp'].cast(TimestampType()))

w = Window().partitionBy(instagram_df["user_id"]).orderBy(instagram_df["create_time"])

instagram_df = instagram_df.withColumn("dist", dist(
    instagram_df["lon"], instagram_df["lat"],
    F.lag(instagram_df["lon"], 1).over(w), F.lag(instagram_df["lat"], 1).over(w)
).alias("dist"))

# calulate timeDiff by second
instagram_df = instagram_df.withColumn("interarrivaltime", (
        F.unix_timestamp(instagram_df["create_time"]) - F.unix_timestamp(
    F.lag(instagram_df["create_time"], 1).over(w))).alias("interarrivaltime"))
instagram_df = instagram_df.filter((instagram_df['interarrivaltime'] != 0)|(instagram_df['interarrivaltime'].isNull()))
instagram_df = instagram_df.withColumn("speed",(instagram_df['dist'] * 3600 / instagram_df['interarrivaltime']).alias("speed"))
instagram_df = instagram_df.filter((instagram_df["speed"]<= 1000)|(instagram_df["speed"].isNull()))
#exclude two consecutive media that imply user relocating with a speed more than 1000km/h
instagram_df.createOrReplaceTempView("instagram")
#instagram_speed_exclusion = instagram_df.select(instagram_df['user_id'],instagram_df['speed'],instagram_df['media_id'],F.lag(instagram_df["media_id"], 1).over(w).alias('p_media_id')).filter(F.col('speed')>1000)
#instagram_speed_exclusion = instagram_speed_exclusion.select(['media_id']).union(instagram_speed_exclusion.select(['p_media_id']))
#instagram_speed_exclusion.createOrReplaceTempView("instagram_speed_exclusion")
#instagram_df = spark.sql("select * from instagram where media_id not in (select media_id from instagram_speed_exclusion)")
#instagram_df = spark.sql("select * from instagram where user_id not in (select user_id from instagram where speed >1000)")
#
#instagram_df = instagram_df.filter(instagram_df['media_id'].isin(instagram_speed_exclusion['media_id'])==False)
#instagram_df = instagram_df.filter(instagram_df['country_id'].isin(['BE','BG','CZ','DK','DE','EE','IE','EL','ES','FR','HR','IT','CY','LV','LT','LU','HU','MT','NL','AT','PL','PT','RO','SI','SK','FI','SE','UK','IS','LI','NO','CH','ME','MK','AL','RS','TR','XK','BA']))
#instagram_df.write.save("/data/instagramdata/instagram_df.parquet",format="parquet",mode='overwrite')
instagram_df.agg(F.countDistinct("media_id")).show()
instagram_df.agg(F.countDistinct("user_id")).show()

+------------------------+
|count(DISTINCT media_id)|
+------------------------+
|                  443102|
+------------------------+

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                  23805|
+-----------------------+



In [4]:
twitter_df.write.save("/data/twitterdata/UserTweetInEurope_df.parquet",format="parquet",mode='overwrite')
instagram_df.write.save("/data/instagramdata/instagram_df.parquet",format="parquet",mode='overwrite')