In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.sql.types import FloatType, TimestampType
from datetime import datetime

In [2]:
spark = SparkSession.builder.appName('pratical_work').getOrCreate()

In [3]:
data = spark.read.json("hdfs:///datasets/geo_curitiba")

In [4]:
important_columns = ["coordinates", "entities", "id_str", "created_at", 
                         "is_quote_status", "quoted_status_id_str", "text"]

data = data.select(important_columns)

In [5]:
map_data = data.select(["quoted_status_id_str", "text"])\
                .where(F.col("quoted_status_id_str").isNotNull()).collect()
map_dict = dict(map(list, map_data))

In [6]:
map_func = F.udf(lambda key: map_dict.get(key, ""))
list_to_item = F.udf(lambda item: item[0])

get_tags = F.udf(lambda tags: [item[1] for item in tags])

get_x = F.udf(lambda item: item[0][0] if item else float('nan'))
get_y = F.udf(lambda item: item[0][1] if item else float('nan'))
get_timestamp = lambda item: datetime.fromtimestamp(int(item)/1000)

convert_timestamp_udf = F.udf(lambda item: get_timestamp(item), TimestampType())

In [7]:
important_columns = ["coordinates", "id_str", "text", "hashtags", "created_at"]

data = data.withColumn("quoted_text", map_func(data.quoted_status_id_str))
data = data.withColumn("text", F.when(data.is_quote_status == True, \
                                        F.concat(F.col("text"), F.lit(" "), F.col("quoted_text")))\
                                .otherwise(data.text))

data = data.withColumn("created_at", list_to_item(data.created_at))
data = data.withColumn("hashtags", data.entities.hashtags)
data = data.select(important_columns)

In [8]:
data.show(1)

+-----------+------------------+--------------------+--------+-------------+
|coordinates|            id_str|                text|hashtags|   created_at|
+-----------+------------------+--------------------+--------+-------------+
|       null|726561770303840256|Olha o universo j...|      []|1462060799000|
+-----------+------------------+--------------------+--------+-------------+
only showing top 1 row



In [9]:
data = data.withColumn("created_at", convert_timestamp_udf(data.created_at))

In [10]:
data = data.withColumn("x", get_x(data.coordinates).cast(FloatType()))
data = data.withColumn("y", get_y(data.coordinates).cast(FloatType()))

In [11]:
data.show(1)

+-----------+------------------+--------------------+--------+-------------------+---+---+
|coordinates|            id_str|                text|hashtags|         created_at|  x|  y|
+-----------+------------------+--------------------+--------+-------------------+---+---+
|       null|726561770303840256|Olha o universo j...|      []|2016-04-30 23:59:59|NaN|NaN|
+-----------+------------------+--------------------+--------+-------------------+---+---+
only showing top 1 row



In [12]:
data = data.withColumn("hashtags", get_tags(data.hashtags))

In [13]:
data = data.withColumn("year", F.year(data.created_at))
data = data.withColumn("month", F.month(data.created_at))
data = data.withColumn("day", F.dayofmonth(data.created_at))
data = data.withColumn("hour", F.hour(data.created_at))

In [14]:
data = data.drop("coordinates", "created_at")

In [15]:
data.show(1)

+------------------+--------------------+--------+---+---+----+-----+---+----+
|            id_str|                text|hashtags|  x|  y|year|month|day|hour|
+------------------+--------------------+--------+---+---+----+-----+---+----+
|726561770303840256|Olha o universo j...|      []|NaN|NaN|2016|    4| 30|  23|
+------------------+--------------------+--------+---+---+----+-----+---+----+
only showing top 1 row



In [16]:
data.printSchema()

root
 |-- id_str: string (nullable = true)
 |-- text: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- x: float (nullable = true)
 |-- y: float (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)



In [17]:
data.write.parquet("hdfs:///user/ghra2016/cleaned_data")