In [1]:
from pyspark.sql import Row
from pyspark.sql.functions import monotonically_increasing_id
import json

In [2]:
from pyspark.sql.functions import explode

In [None]:
from pyspark.sql import SQLContext

In [3]:
import pandas as pd

In [4]:
sc = spark.sparkContext

### Read txtFiles

In [5]:
lines = sc.textFile("s3a://meetupstream/2017/*/*/*/*")

In [6]:
parts = lines.map(lambda l: l.split("\n"))

In [7]:
json_rdd=parts.map(lambda l: Row(rsvp = eval(json.loads(l[0]))))

### Group topics

In [8]:
group_topics_rdd = json_rdd.map(lambda x: Row(group_id = x['rsvp']['group']['group_id'], 
                                                  group_topic=x['rsvp']['group']['group_topics']))

In [9]:
group_topics_rdd.take(3)[0]['group_topic']

[{'topic_name': 'Spirituality', 'urlkey': 'spirituality'},
 {'topic_name': 'Witchcraft', 'urlkey': 'everything-witchcraft'},
 {'topic_name': 'Witches', 'urlkey': 'witches'},
 {'topic_name': 'Pagans & Witches', 'urlkey': 'pagans-witches'},
 {'topic_name': 'Wiccan', 'urlkey': 'wiccan'},
 {'topic_name': 'Pagan', 'urlkey': 'pagan'},
 {'topic_name': 'Herbalists', 'urlkey': 'herbalists'},
 {'topic_name': 'Magick', 'urlkey': 'magick'},
 {'topic_name': 'Earth-Based Spirituality',
  'urlkey': 'earth-based-spirituality'},
 {'topic_name': 'Pagan and Earth Based Spirituality',
  'urlkey': 'pagan-and-earth-based-spirituality'},
 {'topic_name': 'Building a Pagan Community',
  'urlkey': 'building-a-pagan-community'}]

In [10]:
group_topics_df = group_topics_rdd.toDF()

In [11]:
group_topics_df.printSchema()

root
 |-- group_id: long (nullable = true)
 |-- group_topic: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)



In [12]:
group_topics_explode = group_topics_df.select(group_topics_df['group_id'],explode(group_topics_df['group_topic']))


In [21]:
group_topics_isolate = group_topics_explode.select(group_topics_explode['group_id'], group_topics_explode['col']['topic_name'].alias('group_topic')).distinct()

In [22]:
group_topics_isolate.head(20)

[Row(group_id=18442105, group_topic=u'Massage Exchange'),
 Row(group_id=18506339, group_topic=u'Social Events in the Edinburgh Area'),
 Row(group_id=18828826, group_topic=u'New Technology'),
 Row(group_id=13341812, group_topic=u'Predictive Analytics'),
 Row(group_id=11331232, group_topic=u'Charity Events'),
 Row(group_id=249197, group_topic=u'Mountain Biking'),
 Row(group_id=18479804, group_topic=u'First Time Real Estate Investing'),
 Row(group_id=1770743, group_topic=u"Women's Fitness"),
 Row(group_id=11356872, group_topic=u'Small Business Owners'),
 Row(group_id=1391310, group_topic=u'Singing'),
 Row(group_id=1714360, group_topic=u'Qigong Excercise'),
 Row(group_id=1810078, group_topic=u'Spirituality'),
 Row(group_id=19564464, group_topic=u"Men's Support Groups"),
 Row(group_id=18456912, group_topic=u'Knitting'),
 Row(group_id=2546182, group_topic=u'Coffee'),
 Row(group_id=7948592, group_topic=u'German Language'),
 Row(group_id=17097262, group_topic=u'Atheist'),
 Row(group_id=1949070

In [23]:
group_topics_isolate.count()

245671

In [28]:
group_topics_isolate.toPandas().to_csv('group_topics.csv', encoding = 'utf-8')

### Create DataFrame

In [15]:
data = spark.createDataFrame(json_rdd)

In [16]:
data.printSchema()

root
 |-- rsvp: map (nullable = true)
 |    |-- key: string
 |    |-- value: map (valueContainsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)



In [17]:
data.count()

74648

In [20]:
data=data.select(monotonically_increasing_id().alias('row_id'), data['rsvp'])

### 3NF tables

In [24]:
venue_DF=data.select(data['rsvp']['venue']['venue_id'].alias('venue_id'),
               data['rsvp']['venue']['lat'].alias('venue_lat'),
               data['rsvp']['venue']['lon'].alias('venue_lon'),
               data['rsvp']['venue']['venue_name'].alias('venue_name'),
               data['rsvp']['venue']['visibility'].alias('venue_visibility')).distinct()

event_DF = data.select(data['rsvp']['event']['event_id'].alias('event_id'),
                          data['rsvp']['event']['event_name'].alias('event_name'),
                          data['rsvp']['event']['event_url'].alias('event_url'),
                          data['rsvp']['event']['time'].alias('event_time')).distinct()

member_DF = data.select(data['rsvp']['member']['member_id'].alias('member_id'),
               data['rsvp']['member']['member_name'].alias('member_name'),
                data['rsvp']['member']['photo'].alias('member_photo')).distinct()

group_DF = data.select(data['rsvp']['group']['group_id'].alias('group_id'),
                          data['rsvp']['group']['group_country'].alias('group_country'),
                          data['rsvp']['group']['group_city'].alias('group_city'),
                          data['rsvp']['group']['group_lat'].alias('group_lat'),
                          data['rsvp']['group']['group_long'].alias('group_long'),
                          data['rsvp']['group']['group_urlname'].alias('group_urlname')).distinct()

rsvp_DF = data.select(data['row_id'].alias('row_id'), 
                         data['rsvp']['venue']['venue_id'].alias('venue_id'),
                         data['rsvp']['event']['event_id'].alias('event_id'),
                         data['rsvp']['member']['member_id'].alias('member_id'),
                         data['rsvp']['group']['group_id'].alias('group_id'),
                         data['rsvp']['rsvp_id'].alias('rsvp_id'),
                         data['rsvp']['guests'].alias('guests'),
                         data['rsvp']['mtime'].alias('mtime'),
                         data['rsvp']['response'].alias('response'))

### to Pandas

In [27]:

venue_DF.toPandas().to_csv('venue.csv', encoding = 'utf-8')
event_DF.toPandas().to_csv('event.csv', encoding = 'utf-8')
group_DF.toPandas().to_csv('group.csv', encoding = 'utf-8')
member_DF.toPandas().to_csv('member.csv', encoding = 'utf-8')
rsvp_DF.toPandas().to_csv('rsvp.csv', encoding = 'utf-8')

### Create 1NF 

In [None]:
sqlCtx = SQLContext(sc)

In [None]:
data_1NF = sqlCtx.sql(""" SELECT * 
                    FROM rsvp_DF r 
                    LEFT JOIN group_DF g on r.group_id = g.group_id
                    LEFT JOIN group_topics_isolate t on r.group_id = t.group_id
                    LEFT JOIN event_DF e on r.event_id = e.event_id
                    LEFT JOIN member_DF m on r.member_id = m.member_id
                    LEFT JOIN venue_DF v on r.venue_id = v.venue_id
                    """).collect()