In [1]:
from pyspark.sql.functions import desc, explode
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType, IntegerType
import time
import datetime

In [2]:
rsvps = spark.read.json("s3a://meetuprsvpbucket/2017/*/*/*")

In [3]:
# to read df file 
# df = spark.read.load("examples/src/main/resources/users.parquet")

In [4]:
rsvps.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- event_name: string (nullable = true)
 |    |-- event_url: string (nullable = true)
 |    |-- time: long (nullable = true)
 |-- group: struct (nullable = true)
 |    |-- group_city: string (nullable = true)
 |    |-- group_country: string (nullable = true)
 |    |-- group_id: long (nullable = true)
 |    |-- group_lat: double (nullable = true)
 |    |-- group_lon: double (nullable = true)
 |    |-- group_name: string (nullable = true)
 |    |-- group_state: string (nullable = true)
 |    |-- group_topics: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- topic_name: string (nullable = true)
 |    |    |    |-- urlkey: string (nullable = true)
 |    |-- group_urlname: string (nullable = true)
 |-- guests: long (nullable = true)
 |-- member: struct (nullable = true)
 |    |-- member_id: long (nullable

# Create DataFrame 

In [5]:
lower_string = UserDefinedFunction(lambda x: x.lower(), StringType())
df = rsvps.select(*[lower_string(column) if column == 'topic' \
                    else column for column in rsvps.columns]).cache()

In [6]:
group_df = df.select(df['group']['group_id'].alias('group_id'),\
                     df['group']['group_name'].alias('group_name'),\
                     df['group']['group_city'].alias('group_city'), \
                     df['group']['group_country'].alias('group_country'),\
                     df['group']['group_state'].alias('group_state')).distinct().cache()

In [7]:
topic_df = df.select(df['group']['group_id'].alias('group_id'),\
                     df['event']['event_id'].alias('event_id'),\
                     df['event']['time'].alias('event_time'),\
                     explode(df['group']['group_topics']['topic_name']).alias("topic"),\
                     df['group']['group_city'].alias('group_city')).distinct().cache()

In [8]:
list_topic_df = df.select(df['group']['group_id'].alias('group_id'),\
                     df['event']['event_id'].alias('event_id'),\
                     df['event']['time'].alias('event_time'),\
                     explode(df['group']['group_topics']).alias("topic")).distinct()

In [9]:
event_df = df.select(df['group']['group_id'].alias('group_id_event'),\
                     df['event']['event_id'].alias('event_id'),\
                     df['event']['event_name'].alias('event_name'),\
                     df['event']['time'].alias('event_time'),\
                     df['event']['event_url'].alias('url'),\
                     df['venue']['venue_id'].alias('venue_id')).distinct().cache()

In [10]:
rsvp_df = df.select(df['group']['group_id'].alias('group_id_rsvp'),\
                    df['event']['event_id'].alias('event_id'),\
                    df['rsvp_id'],df['member']['member_id'].alias('member_id'),\
                    df['mtime']).distinct()

In [11]:
event_rsvp_df = df.select(df['group']['group_id'].alias('group_id'),\
                          df['event']['event_id'].alias('event_id'),\
                          df['rsvp_id'],
                          df['event']['time'].alias('event_time'),\
                          df['member']['member_id'].alias('member_id'),\
                          df['mtime'].alias('rsvp_time'),\
                          df['group']['group_city'].alias('group_city'), \
                          df['group']['group_country'].alias('group_country'),\
                          df['group']['group_lat'].alias('group_lat'), \
                          df['group']['group_lon'].alias('group_lon')).distinct().cache()

# Show DataFrame

In [12]:
group_df.show(5)

+--------+--------------------+----------+-------------+-----------+
|group_id|          group_name|group_city|group_country|group_state|
+--------+--------------------+----------+-------------+-----------+
|18772700|Boca Raton Pickup...|Boca Raton|           us|         FL|
|14122952|Sunday Assembly D...|    Denver|           us|         CO|
|10365912|West Madison Cour...|   Madison|           us|         WI|
|20301044|Free yoga class G...|    Genève|           ch|       null|
|20370729|Lafayette Busines...| Lafayette|           us|         CA|
+--------+--------------------+----------+-------------+-----------+
only showing top 5 rows



In [13]:
topic_df.show(5)

+--------+---------+-------------+--------------------+----------+
|group_id| event_id|   event_time|               topic|group_city|
+--------+---------+-------------+--------------------+----------+
| 2691862|238046776|1488596400000|Young Professiona...|Costa Mesa|
| 1752199|237923210|1488439800000|    Online Marketing| Melbourne|
| 1539734|237437727|1488763800000|            Buddhist|Santa Cruz|
| 1041184|237813316|1490529600000|    Outdoor  Fitness|Alpharetta|
|19028673|237620808|1488411000000|               Reiki|    Durham|
+--------+---------+-------------+--------------------+----------+
only showing top 5 rows



In [14]:
list_topic_df.show(5)

+--------+---------+-------------+--------------------+
|group_id| event_id|   event_time|               topic|
+--------+---------+-------------+--------------------+
| 3803352|238024697|1490124600000|[Intellectual Dis...|
|18628440|237343136|1488299400000|[Software Develop...|
|19289182|238023727|1489078800000|[Agile Transforma...|
|16693392|238007833|1491728400000|[Healthy Living,h...|
|18575647|237849363|1488303900000|[User Experience,...|
+--------+---------+-------------+--------------------+
only showing top 5 rows



In [15]:
event_rsvp_df.show(5)

+--------+------------+----------+-------------+---------+-------------+----------+-------------+---------+---------+
|group_id|    event_id|   rsvp_id|   event_time|member_id|    rsvp_time|group_city|group_country|group_lat|group_lon|
+--------+------------+----------+-------------+---------+-------------+----------+-------------+---------+---------+
| 7258352|   238019159|1655586057|1488335400000|126177972|1488252032208|  Portland|           us|    45.52|  -122.69|
| 1799261|qmgxflywfbcb|1655705462|1488380400000|219838255|1488311894988|   Boulder|           us|    40.04|  -105.28|
|  466780|   237778952|1654825525|1488376800000|109812672|1487873548000|    London|           gb|    51.52|     -0.1|
| 4292722|   237934253|1654832698|1488654000000| 13464201|1487876447488|  Rosedale|           us|    39.34|   -76.51|
|20131642|svqtnmywdblc|1651296755|1488286800000|208106715|1487878932546|     Paris|           fr|    48.86|     2.34|
+--------+------------+----------+-------------+--------

In [16]:
rsvp_df.show(5)

+-------------+------------+----------+---------+-------------+
|group_id_rsvp|    event_id|   rsvp_id|member_id|        mtime|
+-------------+------------+----------+---------+-------------+
|     19538170|qghvrlywpbjc|1655633118|193835416|1488282922960|
|      8170522|gbbjvlywdblc|1655403435|200094216|1488174402060|
|     10720312|   238019087|1655403448|196317124|1488174408508|
|       519612|   238017624|1655380223|  9291166|1488162327000|
|      4250182|   237872090|1655457663|192992375|1488207232000|
+-------------+------------+----------+---------+-------------+
only showing top 5 rows



In [17]:
event_df.show(5)

+--------------+---------+--------------------+-------------+--------------------+--------+
|group_id_event| event_id|          event_name|   event_time|                 url|venue_id|
+--------------+---------+--------------------+-------------+--------------------+--------+
|       6365752|237451753|Security and Show...|1490205600000|https://www.meetu...|24329770|
|       1246840|236385746|USING LOA & SPIRI...|1488412800000|https://www.meetu...|    null|
|      18992104|238045071|Thursday night cl...|1488513600000|https://www.meetu...|24706591|
|      18940989|237549034|Open Doors Day@Ca...|1488463200000|https://www.meetu...|23778063|
|       4397212|237603430|Viernes 10 Marzo ...|1489159800000|https://www.meetu...|13702512|
+--------------+---------+--------------------+-------------+--------------------+--------+
only showing top 5 rows



# Topic Count 

In [18]:
topic_count = topic_df.groupby('topic').count().orderBy(desc('count'))

In [19]:
topic_count.show(10)

+-----------------+-----+
|            topic|count|
+-----------------+-----+
|           Social|34489|
|Social Networking|28534|
|      New In Town|27967|
| Self-Improvement|25650|
|        Fun Times|25113|
|         Outdoors|23156|
|       Dining Out|19357|
|          Fitness|18624|
|        Adventure|17345|
|     Spirituality|17268|
+-----------------+-----+
only showing top 10 rows



# Popular topics by city

In [20]:
topic_df.groupBy("group_city","topic").count().orderBy(desc('count')).show(20)

+----------+--------------------+-----+
|group_city|               topic|count|
+----------+--------------------+-----+
|  New York|              Social| 1830|
|  New York|   Social Networking| 1825|
|    London|         New In Town| 1763|
|  New York|         New In Town| 1729|
|    London|   Social Networking| 1597|
|  New York|             Singles| 1582|
|    London|              Social| 1421|
|  New York|           Fun Times| 1324|
|    London|           Fun Times| 1268|
|  New York|           Nightlife| 1198|
|    London|             Singles| 1112|
|    London|           Nightlife| 1028|
|  New York|          Dining Out| 1016|
|  New York|           Adventure|  995|
|  New York|    Self-Improvement|  993|
|    London|    Self-Improvement|  916|
|    London|          Dining Out|  885|
|    London|              London|  860|
|  New York|Professional Netw...|  855|
|    London|            Outdoors|  830|
+----------+--------------------+-----+
only showing top 20 rows



# Most active groups

In [21]:
topic_df.groupBy("group_id","group_city").count().orderBy(desc('count')).show(20)

+--------+----------------+-----+
|group_id|      group_city|count|
+--------+----------------+-----+
| 3332802|   Mission Viejo| 1620|
| 1766167|        Richmond| 1620|
|19773849|       Cambridge| 1518|
|19627026|       Deer Park| 1515|
|20096070|      Cincinnati| 1515|
|  579832|      Washington| 1410|
|18663033|           Tokyo| 1320|
|20421348|          London| 1275|
|18290844|           Tokyo| 1246|
| 1590138|          Austin| 1230|
| 1343044|Saint Petersburg| 1215|
|19777446|   Tel Aviv-Yafo| 1210|
| 1576378|          Austin| 1170|
|19590500|          London| 1134|
|18663131|           Tokyo| 1095|
|  627273|         Norfolk| 1067|
|19106729|        Montréal| 1066|
|  950281|          Boston| 1056|
| 1756412|      Pittsburgh| 1024|
| 1516787|           Dubai| 1020|
+--------+----------------+-----+
only showing top 20 rows



In [22]:
# row1 = df1.agg({"x": "max"}).collect()[0]
min_time = event_df.agg({"event_time": "min"}).collect()[0]
max_time = event_df.agg({"event_time": "max"}).collect()[0]

In [23]:
datetime.datetime.fromtimestamp(int(str(min_time["min(event_time)"])[:-3])).strftime('%Y-%m-%d %H:%M:%S')

'2017-02-17 18:00:00'

In [24]:
datetime.datetime.fromtimestamp(int(str(max_time["max(event_time)"])[:-3])).strftime('%Y-%m-%d %H:%M:%S')

'2025-05-05 23:00:00'

# Save DF to S3

In [25]:
ts = time.time()
st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
st

'2017-03-01 09:29:44'

In [26]:
# if you update data, save new dataframe into a new bucket file 

In [27]:
group_df.write.parquet("s3a://meetupdf/group_df")

In [28]:
topic_df.write.parquet("s3a://meetupdf/topic_df")

In [29]:
list_topic_df.write.parquet("s3a://meetupdf/list_topic_df")

In [30]:
event_rsvp_df.write.parquet("s3a://meetupdf/event_rsvp_df")

In [31]:
rsvp_df.write.parquet("s3a://meetupdf/rsvp_df")

In [32]:
event_df.write.parquet("s3a://meetupdf/event_df")