# Where to sell products
## Tweets data exploratory analysis
### Author: Luis Eduardo Ferro Diez <a href="luisedof10@gmail.com">luisedof10@gmail.com</a>, <a href="luis.ferro1@correo.icesi.edu.co">luis.ferro1@correo.icesi.edu.co</a>

This notebook contains code and notes for the exploratory analysis made over the twitter dataset

In [1]:
spark.version

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,,spark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res1: String = 2.4.4


In [2]:
val basePath = "/media/ohtar10/Adder-Storage/datasets/twitter"
val tweetsPath = basePath + "/archiveteam-twitter-stream-2013-09/archiveteam-twitter-stream-2013-09/2013/09/"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

basePath: String = /media/ohtar10/Adder-Storage/datasets/twitter
tweetsPath: String = /media/ohtar10/Adder-Storage/datasets/twitter/archiveteam-twitter-stream-2013-09/archiveteam-twitter-stream-2013-09/2013/09/


### Load the tweets

In [3]:
import org.apache.spark.sql.functions._
import spark.implicits._

val tweetsDF = spark.read.json(s"${tweetsPath}/01/*/*", s"${tweetsPath}/02/*/*", s"${tweetsPath}/03/*/*", s"${tweetsPath}/04/*/*", s"${tweetsPath}/05/*/*")
val tweetCount = tweetsDF.count()
val userCount = tweetsDF.select($"user.id".alias("user")).distinct().count()

println(s"Total tweets: ${tweetCount}")
println(s"Total users: ${userCount}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.functions._
import spark.implicits._
tweetsDF: org.apache.spark.sql.DataFrame = [contributors: string, coordinates: struct<coordinates: array<double>, type: string> ... 25 more fields]
tweetCount: Long = 26118143
userCount: Long = 12140724
Total tweets: 26118143
Total users: 12140724


### Filter by those with place
According to twitter docs, a tweet might contain a place but not exact location. A place is a bounding box where the tweet might have come from. The coordinate is the exact geographical location of the tweet.

#### References:
* https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/intro-to-tweet-json
* https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object
* https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/geo-objects#coordinates-dictionary
* https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/geo-objects#place-dictionary
 
To partition the data as even as possible, we will add fields corresponding to the timestamp when the tweet was posted and separate them by year, month, day and hour.

In [4]:
import spark.implicits._

val datePattern = "EEE MMM dd HH:mm:ss ZZZZZ yyyy"

val tweetsWithPlaces = tweetsDF.select("*").
    where("place is not null").
    withColumn("created_timestamp", to_timestamp($"created_at", datePattern)).
    withColumn("year", year($"created_timestamp")).
    withColumn("month", month($"created_timestamp")).
    withColumn("day", dayofmonth($"created_timestamp")).
    withColumn("hour", hour($"created_timestamp")).
    cache()
    
val withPlaceTweetCount = tweetsWithPlaces.count()
val withPlaceUserCount = tweetsWithPlaces.select($"user.id".alias("user")).distinct().count()

println(s"Total tweets with place: ${withPlaceTweetCount}")
println(s"Total users tweeting with place: ${withPlaceUserCount}")

tweetsWithPlaces.createOrReplaceTempView("tweets_with_place")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import spark.implicits._
datePattern: String = EEE MMM dd HH:mm:ss ZZZZZ yyyy
tweetsWithPlaces: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [contributors: string, coordinates: struct<coordinates: array<double>, type: string> ... 30 more fields]
withPlaceTweetCount: Long = 386523
withPlaceUserCount: Long = 279220
Total tweets with place: 386523
Total users tweeting with place: 279220


In [5]:
%%sql
select * from tweets_with_place limit 20

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…





VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [6]:
tweetsWithPlaces.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- delete: struct (nullable = true)
 |    |-- status: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- user_id: long (nullable = true)
 |    |    |-- user_id_str: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)


In [7]:
val tweetsWithPlacesAndNoCoordinates = tweetsDF.select("*").where("place is not null and coordinates is null")
s"We have ${tweetsWithPlacesAndNoCoordinates.count()} tweets with place but without an exact location."

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

tweetsWithPlacesAndNoCoordinates: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [contributors: string, coordinates: struct<coordinates: array<double>, type: string> ... 25 more fields]
res16: String = We have 62313 tweets with place but without an exact location.


In [8]:
%%sql
select user.following from tweets_with_place where user.following is not null limit 20

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

### Different place types

We can observe from the places that the vast majority of tweets comes from cities, i,e., the granularity of the majority of tweets is city. This would need to be compensated with tweets that have the exact location.

In [9]:
%%sql
select place.place_type as place_type, count(1) as places from tweets_with_place group by place_type

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

### Tweets by language and country

Due to linguistic complexity, we will only consider tweets in english, however, these tweets can come from different countries, even from countries where english is not the official language.

In [10]:
%%sql
select distinct(lang) as lang, place.country as country, count(1) as lang_count from tweets_with_place group by lang, country sort by lang_count desc

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [11]:
%%sql
select place.country as country, count(1) as lang_count from tweets_with_place where lang = 'en' group by country sort by lang_count desc

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [13]:
%%sql
select place.country as country, place.name as place, count(1) as tweets_per_place from tweets_with_place group by country, place

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

### Select and transform tweet data into a flattened parquet table

JSON files occupy significant disk space. We choose to store the results in parquet format since it's more compressed and will help for results storage.

However, some parquet readers in python does not support deeply nested parquet data structures. Hence, we need to flatten the fields so they can be easily imported in other languages or frameworks. Particularly the geometry fields.

In [21]:
import com.esri.core.geometry.ogc.OGCGeometry
import scala.util.{Failure, Success, Try}

/**
 * This function will transform the given geo json object
 * into its WKT representation
 * 
*/
val wktFromGeoJson: String => String = (json: String) => {
   Try(OGCGeometry.fromGeoJson(json).asText()) match {
        case Success(value) => value.toString
        case Failure(exception) => null
    }
}

val stWKTFromGeoJson = udf(wktFromGeoJson)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import com.esri.core.geometry.ogc.OGCGeometry
import scala.util.{Failure, Success, Try}
wktFromGeoJson: String => String = <function1>
stWKTFromGeoJson: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


In [22]:
import org.apache.spark.sql.functions._

val final_tweets = tweetsWithPlaces.select($"id".alias("tweet_id"),
    $"text".alias("tweet_text"),
    $"created_timestamp",
    $"year",
    $"month",
    $"day",
    $"hour",
    $"user.id".alias("user_id"),
    $"user.screen_name".alias("user_name"),
    concat_ws(";", $"entities.hashtags.text").alias("hashtags"),
    concat_ws(";", $"entities.user_mentions.screen_name").alias("user_mentions"),
    concat_ws(";", $"entities.user_mentions.id").alias("user_id_mentions"),
    concat_ws(";", $"entities.urls.expanded_url").alias("expanded_urls"),
    $"favorite_count",
    stWKTFromGeoJson( to_json($"coordinates")).alias("location_geometry"),
    stWKTFromGeoJson( to_json($"place.bounding_box")).alias("place_geometry")
    ).where("place is not null and lang = 'en'")
    
val final_total_tweets = final_tweets.count()
val final_total_users = final_tweets.select($"user_id").distinct().count()
final_tweets.createOrReplaceTempView("tweets")

println(s"The final tweet subset is: ${final_total_tweets}")
println(s"The final user subset is: ${final_total_users}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.functions._
final_tweets: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tweet_id: bigint, tweet_text: string ... 14 more fields]
final_total_tweets: Long = 153631
final_total_users: Long = 115736
The final tweet subset is: 153631
The final user subset is: 115736


In [23]:
%%sql
select * from tweets limit 20

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [None]:
final_tweets.write.mode("append").partitionBy("year", "month", "day", "hour").parquet(s"$basePath/tweets")