In [1]:
import pandas as pd
import numpy as np
import time
from pyspark.sql import SparkSession

'''
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config('spark.sql.execution.arrow.pyspark.enabled', True) \
        .config('spark.sql.session.timeZone', 'UTC') \
        .config('spark.driver.memory','3G') \
        .config('spark.ui.showConsoleProgress', True) \
        .config('spark.sql.repl.eagerEval.enabled', True) \
        .getOrCreate()
    return spark
'''


# create spark session
spark=SparkSession.builder.appName('explore').getOrCreate()
#spark = create_spark_session()

# load csv into df
data_path = '/Users/ianjacobsen/Projects/Duo/data_files/'
filename = 'learning_traces.csv'
df=spark.read.option("header","true").csv(data_path+filename)

# drop duplicates
df = df.distinct()

# view nicely formatted pandas df
#df.limit(10).toPandas()

# count number of rows
#df.count() # 12,854,226 rows

### Create dim_users table

In [2]:
# create users table... count unique timestamps to find number of sessions
users = (df.select(['user_id', 'timestamp']).drop_duplicates()).groupBy('user_id').count()
dim_users = users.withColumnRenamed('count', 'number_of_sessions')

# view users sorted by number of sessions
#users.orderBy(users.number_of_sessions.desc()).show()

### Create dim_times table

- time table: timestamp, year, month, day, dayofweek, hour, minute, second

- data is stored as a unix epoch (https://www.epochconverter.com)

In [3]:
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from datetime import datetime
from pyspark.sql import functions as f

# create times table
times = df.select('timestamp')
times = times.withColumnRenamed('timestamp', 'epoch')

# convert epochs to timestamps
timestamp = udf(lambda x: str(datetime.fromtimestamp(int(x))))
times = times.withColumn('timestamp', timestamp(times.epoch))

# extract granular times data, store as new columns
get_hour = udf(lambda x: datetime.fromtimestamp(int(x)).hour)
get_day = udf(lambda x: datetime.fromtimestamp(int(x)).day)
get_week = udf(lambda x: datetime.fromtimestamp(int(x)).isocalendar()[1])
get_month = udf(lambda x: datetime.fromtimestamp(int(x)).month)
get_year = udf(lambda x: datetime.fromtimestamp(int(x)).year)
get_weekday = udf(lambda x: datetime.fromtimestamp(int(x)).weekday())
times = times.withColumn('hour', get_hour(times.epoch))
times = times.withColumn('day', get_day(times.epoch))
times = times.withColumn('week', get_week(times.epoch))
times = times.withColumn('month', get_month(times.epoch))
times = times.withColumn('year', get_year(times.epoch))
dim_times = times.withColumn('weekday', get_weekday(times.epoch))

### Import language reference table

In [4]:
import json

# read language reference table into df
lang_filename = 'language-codes-full_json.json'
lang_df = spark.read.json(data_path+lang_filename)

# drop duplicates
lang_df = lang_df.distinct()

### Create dim_langs table

In [5]:
# find languages that are used
learn_langs = df.select(['learning_language']).drop_duplicates()
ui_langs = df.select(['ui_language']).drop_duplicates()

# concat and drop duplicates
used_list = list(learn_langs.toPandas()['learning_language']) + list(ui_langs.toPandas()['ui_language'])
used_list = list(set(used_list))

# create table via Pandas
dim_langs = spark.createDataFrame(pd.DataFrame({'alpha2_code': used_list}))

# join with reference table
dim_langs = dim_langs.join(lang_df, dim_langs.alpha2_code == lang_df.alpha2).drop('French', 'alpha2', 'alpha3-b', 'alpha3-t')

dim_langs.show()

+-----------+------------------+
|alpha2_code|           English|
+-----------+------------------+
|         en|           English|
|         fr|            French|
|         pt|        Portuguese|
|         de|            German|
|         es|Spanish; Castilian|
|         it|           Italian|
+-----------+------------------+



### Form language pairs table for analysts

### Load lexeme reference table

In [6]:
# load txt file containing breakdown of lexeme codes
lex_filename = 'lexeme_reference.txt'
lex_df = spark.read.text(data_path+lex_filename)

# make data ingestible... pyspark not able to infer schema as-is
with open(data_path+lex_filename) as f:
    lines = f.readlines()
    f.close()
lex_list = []
for line in lines:
    all_split = line.split()
    lex_list.append([all_split[0], all_split[1], ' '.join(all_split[2:])])

# load txt file to dataframe
from pyspark.sql.types import ArrayType, StructField, StructType, StringType
schema = StructType([
    StructField('code', StringType(), True),
    StructField('type', StringType(), True),
    StructField('description', StringType(), True)
])
lex_df = spark.createDataFrame(lex_list,schema)

# filter word types... we only want parts of speech in dimension table
lex_df = lex_df.filter(lex_df.type == 'POS').select(['code', 'description'])
lex_df = lex_df.withColumnRenamed('description', 'part_of_speech')

# drop duplicates
lex_df = lex_df.distinct()

### Create dim_words table

In [7]:
import re

# create df
words = df.select(['lexeme_id', 'lexeme_string', 'learning_language'])
words = words.withColumnRenamed('learning_language', 'language')
words = words.drop_duplicates()

# extract granular data, create new cols
get_lemma = udf(lambda x: re.search('/(.*?)<', x)[0][1:-1])
get_surface = udf(lambda x: re.search('(.*?)/', x)[0][0:-1])
get_pos = udf(lambda x: re.search('<(.*?)>', x)[0][1:-1])
words = words.withColumn('lemma', get_lemma(words.lexeme_string))
words = words.withColumn('surface', get_surface(words.lexeme_string))
words = words.withColumn('pos', get_pos(words.lexeme_string))
words = words.drop('lexeme_string')

# look-up part of speech, save to table
dim_words = words.join(lex_df, words.pos == lex_df.code, 'left').drop('code', 'pos')

### Create fact_wordviews table

In [8]:
# create word views dataframe
wv_df = df.select(['timestamp', 'user_id', 'learning_language', 'ui_language',
                   'lexeme_id', 'delta', 'history_seen', 'history_correct', 
                   'session_seen', 'session_correct'])

# lookup learning language
#wv_df = wv_df.join(dim_langs, wv_df.learning_language == dim_langs.alpha2_code, 'inner').drop('alpha2_code', 'learning_language')
#wv_df = wv_df.withColumnRenamed('English', 'learning_language')

# lookup ui language
#wv_df = wv_df.join(dim_langs, wv_df.ui_language == dim_langs.alpha2_code, 'inner').drop('alpha2_code', 'ui_language')
#wv_df = wv_df.withColumnRenamed('English', 'ui_language')

# lookup word name (lemma form)
#wv_df = wv_df.join(words, wv_df.lexeme_id == words.lexeme_id, 'left').drop('lexeme_id', 'language', 'surface', 'part_of_speech')

# calculate statistics

percent_correct = udf(lambda x, y: 100 * round(float(x)/float(y), 2))

wv_df = wv_df.withColumn('session_pct', percent_correct(wv_df.session_correct, wv_df.session_seen))
wv_df = wv_df.withColumn('history_pct', percent_correct(wv_df.history_correct, wv_df.history_seen))

# drop granular
wv_df = wv_df.drop('history_seen', 'history_correct', 'session_seen', 'session_correct')

# epoch to timestamp... could have JOIN'd with times table... but better to transform existing column than big join
timestamp = udf(lambda x: str(datetime.fromtimestamp(int(x))))
fact_wordviews = wv_df.withColumn('timestamp', timestamp(wv_df.timestamp))

# Fact Table

In [9]:
fact_wordviews.limit(10).toPandas()

Unnamed: 0,timestamp,user_id,learning_language,ui_language,lexeme_id,delta,session_pct,history_pct
0,2013-02-28 15:16:19,u:fVVF,es,en,0db3ffe210ff31425040e140f5ab422b,34198,100.0,100.0
1,2013-02-28 15:18:47,u:hWIZ,pt,en,99cebc64d09034216ee6c3fe786eeb85,1425,100.0,100.0
2,2013-02-28 15:20:52,u:gQ-5,it,en,290e815d93653e3d13979d367f9725cf,187739,100.0,100.0
3,2013-02-28 15:38:24,u:gxjx,es,en,5fbb4578c6e08d3122b2321720d682d7,3454146,0.0,100.0
4,2013-02-28 15:46:46,u:hyhn,es,en,79fbfd84953c70f664cfe7adac5f7801,284996,100.0,100.0
5,2013-02-28 15:54:34,u:e4Dx,es,en,09016fe20fa8b2cc8b33dc5deb21e72f,3862426,100.0,100.0
6,2013-02-28 16:01:32,u:ifjZ,de,en,513ccd4f1725747bd2e1fafbfee3d435,375,100.0,100.0
7,2013-02-28 16:02:19,u:ffuF,es,en,88d32763695c33d85f3e8af4bb09b6ee,422,100.0,100.0
8,2013-02-28 16:03:33,u:uKm,es,en,5e58ef985b961db85cad5293e30b0892,23081652,100.0,100.0
9,2013-02-28 16:13:01,u:hMBq,it,en,9a04a9af259780659fa790d55e5e700e,1738531,33.0,100.0


# Dimension Tables

In [10]:
dim_words.limit(10).toPandas()

Unnamed: 0,lexeme_id,language,lemma,surface,part_of_speech
0,53fea04539ece170abd38bc9463a1f18,fr,demain,demain,
1,eb5deee1fa10b4cdcaea9928669bf4f1,fr,que,que,
2,65d763ca3bbc0b12f476051a7b515681,fr,que,qu',
3,e0a060dbfa9d94237cb1113bbed9595b,fr,que,que,
4,4cfa43d063858472c6ff8ae5eebb881e,fr,que,qu',
5,8bd6d060bb604e17c936418f835d87c8,fr,mon,mon,Determiner
6,cb9b44daeb2fd896731bcdef127db8c3,es,nuestro,nuestros,Determiner
7,192059cfe22cedd84cd400a9b1b47171,es,nuestro,nuestra,Determiner
8,61ed985e99a3bdc4b497a5e0a46b104d,de,viel,vieles,Determiner
9,b3f8a12ba4ee5200897e30a18125d710,pt,aquele,aquela,Determiner


In [11]:
dim_users.limit(10).toPandas()

Unnamed: 0,user_id,number_of_sessions
0,u:hIc2,2
1,u:cu2y,46
2,u:hpgM,52
3,u:e4vL,1
4,u:hVlN,26
5,u:iC5O,44
6,u:i1-e,16
7,u:iRWs,8
8,u:bWsA,1
9,u:eCK1,19


In [12]:
dim_times.limit(10).toPandas()

Unnamed: 0,epoch,timestamp,hour,day,week,month,year,weekday
0,1362082579,2013-02-28 15:16:19,15,28,9,2,2013,3
1,1362082727,2013-02-28 15:18:47,15,28,9,2,2013,3
2,1362082852,2013-02-28 15:20:52,15,28,9,2,2013,3
3,1362083904,2013-02-28 15:38:24,15,28,9,2,2013,3
4,1362084406,2013-02-28 15:46:46,15,28,9,2,2013,3
5,1362084874,2013-02-28 15:54:34,15,28,9,2,2013,3
6,1362085292,2013-02-28 16:01:32,16,28,9,2,2013,3
7,1362085339,2013-02-28 16:02:19,16,28,9,2,2013,3
8,1362085413,2013-02-28 16:03:33,16,28,9,2,2013,3
9,1362085981,2013-02-28 16:13:01,16,28,9,2,2013,3


In [13]:
dim_langs.limit(10).toPandas()

Unnamed: 0,alpha2_code,English
0,en,English
1,fr,French
2,pt,Portuguese
3,de,German
4,es,Spanish; Castilian
5,it,Italian


# Reference Tables

In [66]:
# df.limit(10).toPandas()

Unnamed: 0,p_recall,timestamp,delta,user_id,learning_language,ui_language,lexeme_id,lexeme_string,history_seen,history_correct,session_seen,session_correct
0,1.0,1362082579,34198,u:fVVF,es,en,0db3ffe210ff31425040e140f5ab422b,casa/casa<n><f><sg>,1,1,2,2
1,1.0,1362082727,1425,u:hWIZ,pt,en,99cebc64d09034216ee6c3fe786eeb85,estou/estar<vblex><pri><p1><sg>,1,1,1,1
2,1.0,1362082852,187739,u:gQ-5,it,en,290e815d93653e3d13979d367f9725cf,<*sf>/enorme<adj><mf><*numb>,1,1,2,2
3,0.0,1362083904,3454146,u:gxjx,es,en,5fbb4578c6e08d3122b2321720d682d7,leemos/leer<vblex><pri><p1><pl>,1,1,1,0
4,1.0,1362084406,284996,u:hyhn,es,en,79fbfd84953c70f664cfe7adac5f7801,<*sf>/cuchara<n><f><*numb>,1,1,1,1
5,1.0,1362084874,3862426,u:e4Dx,es,en,09016fe20fa8b2cc8b33dc5deb21e72f,estudiantes/estudiante<n><mf><pl>,1,1,2,2
6,1.0,1362085292,375,u:ifjZ,de,en,513ccd4f1725747bd2e1fafbfee3d435,wasser/wasser<n><nt><sg><nom>,1,1,2,2
7,1.0,1362085339,422,u:ffuF,es,en,88d32763695c33d85f3e8af4bb09b6ee,araña/araña<n><f><sg>,1,1,1,1
8,1.0,1362085413,23081652,u:uKm,es,en,5e58ef985b961db85cad5293e30b0892,tamaño/tamaño<n><m><sg>,1,1,1,1
9,0.333333333333,1362085981,1738531,u:hMBq,it,en,9a04a9af259780659fa790d55e5e700e,prego/prego<ij>,1,1,3,1


In [None]:
# lang_df.limit(10).toPandas()

In [None]:
# lex_df.limit(10).toPandas()

# Data Quality Checks

Integrity constraints on the relational database (e.g., unique key, data type, etc.)

Unit tests for the scripts to ensure they are doing the right thing

Source/Count checks to ensure completeness

In [None]:
# count to ensure completeness
def qc_source_count(s_df, dim_df, dimension='*'):
    
    source_count = s_df.select([dimension]).drop_duplicates().count()
    dim_count = dim_df.count()
    
    if source_count != dim_count:
        return 'source/count check failed on dimension {}'.format(dimension)
    else:
        return 'data good'

# count number of users in learning traces... compare with size of users table
'''
user_count = qc_source_count(df, dim_users, 'user_id')
if user_count != 'data good':
    print(user_count)

# count number of rows in learning traces... compare with size of word views table
views_count = qc_source_count(df, fact_wordviews)
if views_count != 'data good':
    print(views_count)

# count number of languages in learning traces... compare with size of langs table
langs = df.select(['learning_language']).union(df.select(['ui_language'])).distinct()
langs_count = qc_source_count(langs, dim_langs)
'''

In [None]:
# count number of timestamps in learning traces... compare with size of times table
timestamp_count = qc_source_count(df, dim_times, 'timestamp')
if timestamp_count != 'data good':
    print(timestamp_count)

### primary key checks

In [16]:
pk = dim_langs.select(['alpha2_code']).count()

In [22]:
# check uniqueness of primary key for dimension tables
def qc_check_pk_unique(df, pkey):
    table_size = df.count()
    pk_count = df.select([pkey]).count()
    
    if table_size != pk_count:
        return False
    else:
        return True

# count to ensure completeness of data
def qc_source_count(s_df, dim_df, dimension='*'):
    
    source_count = s_df.select([dimension]).drop_duplicates().count()
    dim_count = dim_df.count()
    
    if source_count != dim_count:
        return False
    else:
        return True

# Analyst Tables

- instead of forming the tables in Spark... write the queries for Redshift

### Table 1:

#### Language Pairs: ui lang, learn lang, number of users learning, number of sessions for pair, number of words seen for pair

In [38]:
# form language pairs table
pairs = fact_wordviews.select(['learning_language', 'ui_language']).drop_duplicates()

# match language pairs alpha2 codes with English language names
langs = pairs.join(dim_langs, pairs.learning_language == dim_langs.alpha2_code, 'inner').drop('alpha2_code')
langs = langs.withColumnRenamed('English', 'Learning Language')
langs = langs.join(dim_langs, pairs.ui_language == dim_langs.alpha2_code, 'inner').drop('alpha2_code')
langs = langs.withColumnRenamed('English', 'UI Language')

langs.show()

+-----------------+-----------+------------------+------------------+
|learning_language|ui_language| Learning Language|       UI Language|
+-----------------+-----------+------------------+------------------+
|               pt|         en|        Portuguese|           English|
|               de|         en|            German|           English|
|               es|         en|Spanish; Castilian|           English|
|               it|         en|           Italian|           English|
|               fr|         en|            French|           English|
|               en|         pt|           English|        Portuguese|
|               en|         es|           English|Spanish; Castilian|
|               en|         it|           English|           Italian|
+-----------------+-----------+------------------+------------------+



In [61]:
# count how many word views for each pair
wv_pairs = fact_wordviews.select(['learning_language', 'ui_language']).groupBy(['learning_language', 'ui_language']).count()

wv_pairs = wv_pairs.withColumnRenamed('count', 'Total Word Views')\
.withColumnRenamed('learning_language', 'wv_ll')\
.withColumnRenamed('ui_language', 'wv_ul')

wv_pairs = wv_pairs.join(langs, (wv_pairs.wv_ll == langs.learning_language) & 
                         (wv_pairs.wv_ul == langs.ui_language)).drop(*['wv_ll', 
                                                                       'wv_ul', 
                                                                       'learning_language', 
                                                                       'ui_language'])
wv_pairs.show()

+----------------+------------------+------------------+
|Total Word Views| Learning Language|       UI Language|
+----------------+------------------+------------------+
|         1873716|            French|           English|
|          424152|           English|           Italian|
|         3641167|           English|Spanish; Castilian|
|          311480|        Portuguese|           English|
|          949460|           English|        Portuguese|
|          793919|           Italian|           English|
|         1452597|            German|           English|
|         3407654|Spanish; Castilian|           English|
+----------------+------------------+------------------+



In [63]:
# count how many users for each language pair
user_pair

In [69]:
test_df = fact_wordviews.limit(10000)

In [70]:
test_df.show()

+-------------------+-------+-----------------+-----------+--------------------+--------+-----------+-----------+
|          timestamp|user_id|learning_language|ui_language|           lexeme_id|   delta|session_pct|history_pct|
+-------------------+-------+-----------------+-----------+--------------------+--------+-----------+-----------+
|2013-02-28 15:16:19| u:fVVF|               es|         en|0db3ffe210ff31425...|   34198|      100.0|      100.0|
|2013-02-28 15:18:47| u:hWIZ|               pt|         en|99cebc64d09034216...|    1425|      100.0|      100.0|
|2013-02-28 15:20:52| u:gQ-5|               it|         en|290e815d93653e3d1...|  187739|      100.0|      100.0|
|2013-02-28 15:38:24| u:gxjx|               es|         en|5fbb4578c6e08d312...| 3454146|        0.0|      100.0|
|2013-02-28 15:46:46| u:hyhn|               es|         en|79fbfd84953c70f66...|  284996|      100.0|      100.0|
|2013-02-28 15:54:34| u:e4Dx|               es|         en|09016fe20fa8b2cc8...| 3862426

In [111]:
users = test_df.select(['learning_language',
                        'ui_language',
                        'user_id']).distinct().groupBy(['learning_language',
                                                        'ui_language']).count()M

users = users.withColumnRenamed('count', 'number of learners')

users.show()

In [33]:
hs = fact_wordviews.select(['learning_language', 'ui_language']).groupBy(['learning_language', 'ui_language']).count()
hs.show()

+-----------------+-----------+-------+
|learning_language|ui_language|  count|
+-----------------+-----------+-------+
|               fr|         en|1873716|
|               en|         it| 424152|
|               en|         es|3641167|
|               pt|         en| 311480|
|               en|         pt| 949460|
|               it|         en| 793919|
|               de|         en|1452597|
|               es|         en|3407654|
+-----------------+-----------+-------+



In [34]:
he = fact_wordviews.select(['learning_language', 'ui_language', 'user_id', 'timestamp']).drop_duplicates().groupBy(['learning_language', 'ui_language', 'timestamp']).count()
he.show()

+-----------------+-----------+-------------------+-----+
|learning_language|ui_language|          timestamp|count|
+-----------------+-----------+-------------------+-----+
|               es|         en|2013-03-04 21:03:48|    1|
|               es|         en|2013-03-02 12:04:52|    1|
|               es|         en|2013-03-01 23:09:24|    1|
|               pt|         en|2013-03-05 00:41:37|    1|
|               fr|         en|2013-03-11 13:42:21|    1|
|               de|         en|2013-02-28 17:44:09|    1|
|               es|         en|2013-03-05 20:04:06|    1|
|               it|         en|2013-03-08 16:17:27|    1|
|               de|         en|2013-03-02 02:41:25|    1|
|               en|         es|2013-03-06 18:18:37|    1|
|               pt|         en|2013-03-10 19:30:43|    1|
|               fr|         en|2013-03-10 21:30:04|    1|
|               en|         es|2013-03-01 06:07:15|    1|
|               de|         en|2013-03-01 07:52:06|    1|
|             

### Table 2:

#### 

In [None]:
'''
# form language pairs table
pairs = fact_wordviews.select(['learning_language', 'ui_language']).drop_duplicates()

# match language pairs alpha2 codes with English language names
langs = pairs.join(dim_langs, pairs.learning_language == dim_langs.alpha2_code, 'inner').drop('alpha2_code')
langs = langs.join(dim_langs, pairs.ui_language == dim_langs.alpha2_code, 'inner').drop('alpha2_code')

# count how many users are learning each pair
ha = fact_wordviews.select(['learning_language', 'ui_language', 'user_id']).groupBy(['learning_language', 'ui_language']).count()
ha.show()

# count how many word views for each pair
ha = fact_wordviews.select(['learning_language', 'ui_language']).groupBy(['learning_language', 'ui_language']).count()

# count how many sessions for each pair
#ha = fact_wordviews.select(['learning_language', 'ui_language', 'user_id', 'timestamp']).drop_duplicates().groupBy(['learning_language', 'ui_language', 'timestamp']).count()
ha.show()

hs = fact_wordviews.select(['learning_language', 'ui_language', 'user_id', 'timestamp'])
'''

# Write to Parquet Files

In [None]:
output_data = './parquet_files/'

In [None]:
'''
dim_times.write.mode('overwrite').parquet(output_data + 'dim_times.parquet')
dim_langs.write.mode('overwrite').parquet(output_data + 'dim_langs.parquet')
dim_users.write.mode('overwrite').parquet(output_data + 'dim_users.parquet')
dim_words.write.mode('overwrite').parquet(output_data + 'dim_words.parquet')
fact_wordviews.write.mode('overwrite').parquet(output_data + 'fact_wordviews.parquet')
'''