# Raw data exploration (Python)

This notebook contains some raw data exploration done in python.  Python is convenient for this work (compared to scala) because it allows us to parse json into a dict without needing to specify types for the resulting data structure.

In [1]:
# note our notebook comes with a spark context
spark

## Reading the raw data
Let's load the data from json into an array of dicts.  The data set isn't large; it handily fits in memory.

In [2]:
import glob
import os
import json
import datetime

data_dir = 'raw_data/rocdev'
files = glob.glob(data_dir + '/*/*.json')

all_data = []
samples = {}

# let's grab 10 sample values for each field
def do_sample(k, v):
    if k not in samples:
        samples[k] = []
    if len(samples[k]) <= 10:
        samples[k].append(v)
        
# loop through the files, read them as dicts
for file in files:
    with open(file, 'r') as f:
        # the channel name and date is embedded in the file name
        _, _, channel, fn = file.split('/')
        date = os.path.splitext(fn)[0]
        data = json.load(f)

        # accumulate the data and samples
        for datum in data:
            datum['channel'] = channel
            datum['date'] = date
            [do_sample(k, v) for k, v in datum.items()]
            all_data.append(datum)

In [3]:
# how many rows do we have?
len(all_data)

168340

In [4]:
# what does the first row look like?
all_data[0]

{'type': 'message',
 'text': 'sorry, I just spotted your reply. Yes paid account. Perhaps I don’t have the right permissions on the account. Thanks',
 'user': 'U42EURZU4',
 'ts': '1502724840.797357',
 'channel': 'apple',
 'date': '2017-08-14'}

Now we want to create a dataframe.  We don't have a schema, but spark can infer one if we feed it all of the json.

In [5]:
all_json = [json.dumps(e) for e in all_data]
df = spark.read.json(sc.parallelize(all_json))
df.printSchema()

root
 |-- attachments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- actions: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- confirm: struct (nullable = true)
 |    |    |    |    |    |-- dismiss_text: string (nullable = true)
 |    |    |    |    |    |-- ok_text: string (nullable = true)
 |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |    |-- title: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- style: string (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |    |    |-- audio_html: string (nullable = true)
 |    |    |-- audio_html_height: long (nullable = true)
 |    |    |-- audio_html_width: long (

Well that's gnarly, but it'll work for now.  If we wanted to make this a production system, we might want to put some more effort into finding a better schema to represent this data.

In [6]:
# just to illustrate some pyspark functionality, let's find the top channels
from pyspark.sql import functions as F
df\
    .groupBy('channel')\
    .count()\
    .select('channel', 'count')\
    .sort(F.col('count').desc())\
    .show()

+-----------------+-----+
|          channel|count|
+-----------------+-----+
|          general|71493|
|          careers|16418|
|        mentoring| 9293|
|         politics| 6754|
|          paychex| 5639|
|       javascript| 5158|
|         security| 4267|
|           gaming| 3910|
|   remote-workers| 2839|
|american-football| 2777|
|           devops| 2602|
|           python| 2440|
|              git| 2397|
|           random| 2362|
|             food| 2138|
|              www| 1897|
|           status| 1632|
|fakeinternetmoney| 1542|
|   ethics-in-tech| 1496|
|           dotnet| 1424|
+-----------------+-----+
only showing top 20 rows



We can execute spark sql on a dataframe by registering it as a temp view.

In [7]:
df.createOrReplaceTempView("events")

spark.sql("select channel, count(1) as count from events group by channel order by count desc").show()

+-----------------+-----+
|          channel|count|
+-----------------+-----+
|          general|71493|
|          careers|16418|
|        mentoring| 9293|
|         politics| 6754|
|          paychex| 5639|
|       javascript| 5158|
|         security| 4267|
|           gaming| 3910|
|   remote-workers| 2839|
|american-football| 2777|
|           devops| 2602|
|           python| 2440|
|              git| 2397|
|           random| 2362|
|             food| 2138|
|              www| 1897|
|           status| 1632|
|fakeinternetmoney| 1542|
|   ethics-in-tech| 1496|
|           dotnet| 1424|
+-----------------+-----+
only showing top 20 rows

