## Q1 and Setup

In [257]:
import findspark
findspark.init('/home/danniel/spark-2.1.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession

In [258]:
spark = SparkSession.builder.appName('Tub_lab').getOrCreate()

## Q2 - Load the two files into spark and join them by user_id.

In [259]:
data1 = spark.read.csv("solutions_test_file1.csv", header = True, inferSchema= True)
data2 = spark.read.csv("solutions_test_file2.csv", header = True, inferSchema= True)

In [260]:
data1.printSchema()
data2.printSchema() # we need to drop c5 to c11


root
 |-- user_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age_bin: string (nullable = true)
 |-- country: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- language: string (nullable = true)
 |-- creator_country: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



In [261]:
data2 = data2.drop('_c5', '_c6', '_c7','_c8','_c9', '_c10', '_c11')
data2.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- language: string (nullable = true)
 |-- creator_country: string (nullable = true)
 |-- comment_text: string (nullable = true)



In [262]:
# Joining by user_id
full_data = data1.join(data2, data1.user_id == data2.user_id, how = 'inner').drop(data2.user_id)
full_data.show(3)

+--------------------+------+-------+-------+-------------+--------+---------------+--------------------+
|             user_id|gender|age_bin|country|     category|language|creator_country|        comment_text|
+--------------------+------+-------+-------+-------------+--------+---------------+--------------------+
|1445876f62cd03a0d...|     m|  35-44|     US|        Music|      en|             US|how you hug a pup...|
|eaadfa19afdb902d0...|     m|  25-34|     NO|Entertainment|      en|             GB|               First|
|8681d7a8a7306c4cf...|     m|  25-34|     US|Entertainment|      en|             US|I bet its so hot ...|
+--------------------+------+-------+-------+-------------+--------+---------------+--------------------+
only showing top 3 rows



## Q3 Compute percentages of categories by gender and report the top 3 categories for each gender as well as the percentages.

In [263]:
from pyspark.sql.functions import format_number

In [264]:
full_data.filter(full_data['gender'] == 'f').groupBy('category').count().show()

+--------------------+-----+
|            category|count|
+--------------------+-----+
|           Education|    5|
|              Gaming|   23|
|       Entertainment|   37|
|     Travel & Events|    2|
|Science & Technology|    3|
|              Sports|    3|
|       Howto & Style|   34|
|    Film & Animation|    9|
|      People & Blogs|   38|
|     News & Politics|    4|
|      Pets & Animals|    1|
|               Music|    9|
|              Comedy|    6|
+--------------------+-----+



In [265]:
count_by_cat_gender = full_data.groupBy(['gender','category']).agg({'*':'count'})

In [266]:
men_women_count = full_data.groupBy("gender").count().collect()
men_women_count[1]
men_women_count[0][1]


826

In [267]:
count_by_cat_gender\
    .filter(count_by_cat_gender['gender'] == 'f')\
    .select(["gender", 
             "category",
             count_by_cat_gender["count(1)"].alias("count"),
             (format_number((100*count_by_cat_gender["count(1)"]/men_women_count[1][1]),2)).cast("float")])\
    .withColumnRenamed("CAST(format_number(((count(1) * 100) / 174), 2) AS FLOAT)", "percentage_among_female")\
    .orderBy("percentage_among_female", ascending = False).show(3)

+------+--------------+-----+-----------------------+
|gender|      category|count|percentage_among_female|
+------+--------------+-----+-----------------------+
|     f|People & Blogs|   38|                  21.84|
|     f| Entertainment|   37|                  21.26|
|     f| Howto & Style|   34|                  19.54|
+------+--------------+-----+-----------------------+
only showing top 3 rows



In [268]:
count_by_cat_gender\
    .filter(count_by_cat_gender['gender'] == 'm')\
    .select(["gender", 
             "category",
             count_by_cat_gender["count(1)"].alias("count"),
             (format_number((100*count_by_cat_gender["count(1)"]/men_women_count[0][1]),2)).cast("float")])\
    .withColumnRenamed("CAST(format_number(((count(1) * 100) / 826), 2) AS FLOAT)", "percentage_among_male")\
    .orderBy("percentage_among_male", ascending = False).show(3)

+------+---------------+-----+---------------------+
|gender|       category|count|percentage_among_male|
+------+---------------+-----+---------------------+
|     m|         Gaming|  202|                24.46|
|     m|  Entertainment|  196|                23.73|
|     m|News & Politics|   92|                11.14|
+------+---------------+-----+---------------------+
only showing top 3 rows



## Q4 Using pyspark.ml.feature.Tokenizer split the comments into lists of words. Combine the lists of words from all comments into lists of words by gender and category. Report the top 3 most common words for gender='f' and category='People & Blogs', as well as the number of occurrences.

In [269]:
from pyspark.ml.feature import Tokenizer,CountVectorizer
from pyspark.sql.functions import collect_list, udf, explode, col
from pyspark.sql.types import ArrayType, StringType,IntegerType,FloatType
from pyspark.ml.linalg import SparseVector
from itertools import chain
import pandas as pd

In [270]:
full_data.show(1)

+--------------------+------+-------+-------+--------+--------+---------------+--------------------+
|             user_id|gender|age_bin|country|category|language|creator_country|        comment_text|
+--------------------+------+-------+-------+--------+--------+---------------+--------------------+
|1445876f62cd03a0d...|     m|  35-44|     US|   Music|      en|             US|how you hug a pup...|
+--------------------+------+-------+-------+--------+--------+---------------+--------------------+
only showing top 1 row



In [271]:
Q4_full_data = full_data.select(["gender","category","comment_text"])
Q4_full_data.show(2)

+------+-------------+--------------------+
|gender|     category|        comment_text|
+------+-------------+--------------------+
|     m|        Music|how you hug a pup...|
|     m|Entertainment|               First|
+------+-------------+--------------------+
only showing top 2 rows



In [272]:
tok = Tokenizer(inputCol="comment_text", outputCol= "tok_output")
Q4_data_tok = tok.transform(Q4_full_data)
Q4_data_tok.show(3)

+------+-------------+--------------------+--------------------+
|gender|     category|        comment_text|          tok_output|
+------+-------------+--------------------+--------------------+
|     m|        Music|how you hug a pup...|[how, you, hug, a...|
|     m|Entertainment|               First|             [first]|
|     m|Entertainment|I bet its so hot ...|[i, bet, its, so,...|
+------+-------------+--------------------+--------------------+
only showing top 3 rows



In [273]:
# grouped words by gender and category
Q4_data_agg = Q4_data_tok.groupBy(["gender","category"])\
    .agg(collect_list("tok_output"))\
    .orderBy(["gender","category"])\
    .withColumnRenamed("collect_list(tok_output)", "tok_collected")
Q4_data_agg.show(3)

+------+-------------+--------------------+
|gender|     category|       tok_collected|
+------+-------------+--------------------+
|     f|       Comedy|[WrappedArray(my,...|
|     f|    Education|[WrappedArray(goi...|
|     f|Entertainment|[WrappedArray(ala...|
+------+-------------+--------------------+
only showing top 3 rows



In [274]:
flattened  = udf(f= lambda x: list(chain.from_iterable(x)), 
                 returnType= ArrayType(StringType()))
Q4_data_flatted = Q4_data_agg.withColumn("flat_output", flattened("tok_collected")).drop("tok_collected")
Q4_data_flatted.show(3)

+------+-------------+--------------------+
|gender|     category|         flat_output|
+------+-------------+--------------------+
|     f|       Comedy|[my, walls, as, a...|
|     f|    Education|[goil, sounds, li...|
|     f|Entertainment|[alaric, pls, bab...|
+------+-------------+--------------------+
only showing top 3 rows



In [275]:
# CountVectorizer actually not needed in this Question 4
cv = CountVectorizer(inputCol="flat_output", outputCol="cv_output")
cv_model = cv.fit(Q4_data_flatted)
Q4_vectorized = cv_model.transform(Q4_data_flatted)
Q4_vectorized.show(3,truncate = True)

+------+-------------+--------------------+--------------------+
|gender|     category|         flat_output|           cv_output|
+------+-------------+--------------------+--------------------+
|     f|       Comedy|[my, walls, as, a...|(4440,[0,2,3,5,7,...|
|     f|    Education|[goil, sounds, li...|(4440,[0,1,5,10,1...|
|     f|Entertainment|[alaric, pls, bab...|(4440,[0,1,2,3,4,...|
+------+-------------+--------------------+--------------------+
only showing top 3 rows



In [276]:
Q4_vectorized_f_PnB = Q4_vectorized.filter((Q4_vectorized["gender"] == "f") & (Q4_vectorized["category"] == "People & Blogs"))
Q4_vectorized_f_PnB.show()

+------+--------------+--------------------+--------------------+
|gender|      category|         flat_output|           cv_output|
+------+--------------+--------------------+--------------------+
|     f|People & Blogs|[your, makeup, is...|(4440,[0,1,2,3,4,...|
+------+--------------+--------------------+--------------------+



In [277]:

Q4_vocab_f_PnB = Q4_vectorized_f_PnB.select("flat_output").rdd.flatMap(lambda x: x[0])\
    .toDF(schema = StringType()).toDF('word')
Q4_vocab_f_PnB.show(2)    

+------+
|  word|
+------+
|  your|
|makeup|
+------+
only showing top 2 rows



In [278]:
Q4_vocab_freq_f_PnB = Q4_vocab_f_PnB.rdd.countByValue()
Q4_df_answer = pd.DataFrame([[k[0],v] for k,v in Q4_vocab_freq_f_PnB.items()],
                columns = ["word_female_PeopleNBlogs", "ocurances"])
Q4_df_answer.sort_values('ocurances',ascending=False)[0:3]


Unnamed: 0,word_female_PeopleNBlogs,ocurances
63,i,15
34,the,13
68,a,12


## Q5 Compute the frequency of occurrence for each word, normalized by the total number of words for all gender / categories (call this f_all), as well as by the total number of words in each gender / category (call this f_gc). Compute the frequency ratio R = f_gc / f_all. Report the top 3 words ordering by R for gender='f' and category='People & Blogs' after imposing a minimum number of 5 overall occurrences (i.e. in all gender / categories).

In [279]:
# Since requirement of minimum 5 occurances, we need to find the list of word that pass this requirement first
# by using CountVectorizer and set minDF = 5 , we can find the list of words that passed this requirement.
# we can retrieve the list of passed words by using cv_model.vocabulary, where cv_model is our CountVectorizer model

from pyspark.sql.functions import col
Q4_data_tok.show(3)

+------+-------------+--------------------+--------------------+
|gender|     category|        comment_text|          tok_output|
+------+-------------+--------------------+--------------------+
|     m|        Music|how you hug a pup...|[how, you, hug, a...|
|     m|Entertainment|               First|             [first]|
|     m|Entertainment|I bet its so hot ...|[i, bet, its, so,...|
+------+-------------+--------------------+--------------------+
only showing top 3 rows



In [280]:
cv = CountVectorizer(inputCol="tok_output", outputCol="cv_output", minDF= 5)
cv_model = cv.fit(Q4_data_tok)
Q5_vectorized = cv_model.transform(Q4_data_tok)
cv_model.vocabulary
Q5_vectorized.show(3)

+------+-------------+--------------------+--------------------+--------------------+
|gender|     category|        comment_text|          tok_output|           cv_output|
+------+-------------+--------------------+--------------------+--------------------+
|     m|        Music|how you hug a pup...|[how, you, hug, a...|(297,[2,7,26],[1....|
|     m|Entertainment|               First|             [first]|   (297,[120],[1.0])|
|     m|Entertainment|I bet its so hot ...|[i, bet, its, so,...|(297,[1,2,8,14,15...|
+------+-------------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [281]:
Q5_filtered_words = Q5_vectorized\
    .agg(collect_list('tok_output'))\
    .withColumn("flat_output", flattened("collect_list(tok_output)"))\
    .drop('collect_list(tok_output)')\
    .select("flat_output")\
    .rdd\
    .flatMap(lambda x: x[0])\
    .toDF(schema = StringType())\
    .toDF('word')\
    .filter(col("word").isin(cv_model.vocabulary))

Q5_filtered_words.show(3)

+----+
|word|
+----+
| how|
| you|
|   a|
+----+
only showing top 3 rows



In [282]:
# here we are going to check whether we successfully filter out the words that are < 5 document frequency
Q5_word_freq_check = Q5_filtered_words.rdd.countByValue()
Q5_df_word_freq_check = pd.DataFrame([[k[0],v] for k,v in Q5_word_freq_check.items()],
                columns = ["word", "ocurances"])
Q5_df_word_freq_check.sort_values('ocurances',ascending=True)[0:3]
# Yes, we are confirmed that the minimum occurance is 5

Unnamed: 0,word,ocurances
83,wrong,5
274,time.,5
40,<3,5


In [283]:
Q5_f_all = (spark.createDataFrame(Q5_df_word_freq_check))\
    .withColumn("f_all", col('ocurances')/Q5_filtered_words.count())\
    .orderBy('f_all', ascending = False)
Q5_f_all.show()

+----+---------+--------------------+
|word|ocurances|               f_all|
+----+---------+--------------------+
| the|      484|  0.0715658731332249|
|   i|      297|0.043915422149933464|
|   a|      266|0.039331657548425256|
|  is|      201|0.029720538222682242|
|  to|      192| 0.02838976785450244|
| and|      185| 0.02735472423480704|
|  of|      160|0.023658139878752035|
| you|      156|0.023066686381783233|
|that|      150|0.022179506136330032|
|this|      142| 0.02099659914239243|
|  in|      111|0.016412834540884224|
|  it|      103| 0.01522992754694662|
| for|       88|0.013011976933313619|
| was|       84|0.012420523436344817|
|like|       79|0.011681206565133816|
|  so|       77|0.011385479816649415|
|  on|       69|0.010202572822711814|
| not|       69|0.010202572822711814|
|  be|       66|0.009758982699985213|
|just|       64|0.009463255951500812|
+----+---------+--------------------+
only showing top 20 rows



In [284]:
# next we're going to work on f_gc, which is "female" & "people&Blogs"
# the way we do it is very similar like finding f_all, except we have to filter by "female" and "people&blogs" 
Q5_vectorized_f_PnB = Q5_vectorized.filter((Q5_vectorized["gender"] == "f") & (Q5_vectorized["category"] == "People & Blogs"))
Q5_vectorized_f_PnB.show(3)

+------+--------------+--------------------+--------------------+--------------------+
|gender|      category|        comment_text|          tok_output|           cv_output|
+------+--------------+--------------------+--------------------+--------------------+
|     f|People & Blogs|YOUR MAKEUP IS PR...|[your, makeup, is...|(297,[3,39],[1.0,...|
|     f|People & Blogs|           beauty���|         [beauty���]|         (297,[],[])|
|     f|People & Blogs|Angel!!!!! She lo...|[angel!!!!!, she,...|(297,[0,15,60,85]...|
+------+--------------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [285]:
Q5_filtered_words_f_PnB = Q5_vectorized_f_PnB\
    .agg(collect_list('tok_output'))\
    .withColumn("flat_output", flattened("collect_list(tok_output)"))\
    .drop('collect_list(tok_output)')\
    .select("flat_output")\
    .rdd\
    .flatMap(lambda x: x[0])\
    .toDF(schema = StringType())\
    .toDF('word')\
    .filter(col("word").isin(cv_model.vocabulary))

Q5_filtered_words_f_PnB.show(3)

+----+
|word|
+----+
|your|
|  is|
| she|
+----+
only showing top 3 rows



In [286]:
# same procedure as previously to check document frequency within the subset
Q5_word_freq_check_f_PnB = Q5_filtered_words_f_PnB.rdd.countByValue()
Q5_df_word_freq_check_PnB = pd.DataFrame([[k[0],v] for k,v in Q5_word_freq_check_f_PnB.items()],
                columns = ["word_f_PnB", "ocurances_f_PnB"])
Q5_df_word_freq_check_PnB.sort_values('ocurances_f_PnB',ascending=False)[0:3]
# it may seemed wrong, but remember that threshold 5 is limit of the ENTIRE document, 
# when we subset by gender and category, it is okay to have occurance < 5 


Unnamed: 0,word_f_PnB,ocurances_f_PnB
59,i,15
26,the,13
86,a,12


In [287]:
Q5_f_gc_f_PnB = (spark.createDataFrame(Q5_df_word_freq_check_PnB))\
    .withColumn("f_gc", col('ocurances_f_PnB')/Q5_filtered_words_f_PnB.count())\
    .orderBy('f_gc', ascending = False)
Q5_f_gc_f_PnB.show(3)

+----------+---------------+-------------------+
|word_f_PnB|ocurances_f_PnB|               f_gc|
+----------+---------------+-------------------+
|         i|             15|0.05415162454873646|
|       the|             13|0.04693140794223827|
|         a|             12|0.04332129963898917|
+----------+---------------+-------------------+
only showing top 3 rows



In [288]:
# The next step is finding the R(where R = f_gc/f_all), since Q5_f_all and Q5_f_gc_f_PnB has different size, 
# we need to filter the Q5_f_all so it only contains words that within Q5_f_gc_f_PnB
list_word_subset = [i.word for i in Q5_filtered_words_f_PnB.select('word').collect()]

In [289]:
Q5_f_gc_f_PnB.orderBy("word_f_PnB").show(3)

+----------+---------------+--------------------+
|word_f_PnB|ocurances_f_PnB|                f_gc|
+----------+---------------+--------------------+
|          |              7| 0.02527075812274368|
|         "|              1|0.003610108303249...|
|        "i|              1|0.003610108303249...|
+----------+---------------+--------------------+
only showing top 3 rows



In [290]:
Q5_f_all_f_PnB = Q5_f_all.filter(col('word')\
    .isin(list_word_subset))\
    .orderBy('word')
    
Q5_f_all_f_PnB.show(3)

+----+---------+--------------------+
|word|ocurances|               f_all|
+----+---------+--------------------+
|    |       37|0.005470944846961407|
|   "|        9|0.001330770368179802|
|  "i|        5|7.393168712110011E-4|
+----+---------+--------------------+
only showing top 3 rows



In [291]:
# the final step is we just need to join both together and calculate R

In [292]:
Q5_answer = Q5_f_gc_f_PnB.join(Q5_f_all_f_PnB, 
                                Q5_f_gc_f_PnB.word_f_PnB == Q5_f_all_f_PnB.word, 
                                how = 'inner')\
    .drop(Q5_f_all_f_PnB.word)\
    .withColumn("R", (col("f_gc")/col("f_all")))\
    .orderBy("R", ascending = False)
Q5_answer.show(3)

+----------+---------------+--------------------+---------+--------------------+------------------+
|word_f_PnB|ocurances_f_PnB|                f_gc|ocurances|               f_all|                 R|
+----------+---------------+--------------------+---------+--------------------+------------------+
|       too|              3|0.010830324909747292|        7|0.001035043619695...|10.463641052088706|
|       two|              2|0.007220216606498195|        5|7.393168712110011E-4| 9.766064981949459|
|      live|              2|0.007220216606498195|        6|8.871802454532013E-4| 8.138387484957882|
+----------+---------------+--------------------+---------+--------------------+------------------+
only showing top 3 rows



####  "too", "two", and "live" are words with the highest R within the subset of f_PnB(F and People&blogs)

### Thank you so much for the chance of doing this challenge, this is pretty challenging task and I have learned a lot in just short period of time. Thank you so much for Ted and the team. Please feel free to email me dannielwinarto@gmail.com for further questions 