 # CMU 10-405/605 Machine Learning with Large Datasets
## **Welcome to Recitation 3**
This notebook will walk you through Spark transformations and actions. We will also process large Wikipedia dump to find interesting insights about your favorite celebrities.

#### ** Exploratory Data Analysis on Wikipedia dump **

In [0]:
# Load the Wikipedia dataset
wikiRDD = sc.wholeTextFiles("/FileStore/tables/text/*/*")

# Get all the pages
pages = wikiRDD.flatMap(lambda x: x[1].split('</doc>'))


def get_title_and_content(content):
    """
      This method generates a tuple of (title, actual content) from a Wiki dump of each page
    """
    # Remove any leading or lagging space if present 
    content = content.strip()
    title, actual_content = '', ''
    try:
        if(content != ''):
            # Split the content on the basis of new line
            arr = content.split("\n",2)
            # Second line is the title
            title = arr[1]
            # Rest is the actual content
            actual_content = arr[2]
    except:
        title, actual_content = '', ''
    return (title, actual_content)
  
  
# Extract the title and actual content
pages = pages.map(get_title_and_content).cache()


import re
def check_if_person(content):
    """ 
      This method checks if a page is of a celebrity or not.
    """
    # Checking in first 150 characters
    content = content[:150]
    # Checking for format like 12 August 1993
    list1 = re.findall(r"[\d]{1,2} [ADFJMNOS]\w* [\d]{4}", content)
    # Checking for format like August 12, 1993
    list2 = re.findall(r"[ADFJMNOS]\w* [\d]{1,2}[,] [\d]{4}", content)
    if(len(list1)>0 or len(list2)>0):
        return True
    return False
  

# Get all the celebrities pages
celebrities = pages.filter(lambda x : check_if_person(x[1]))

In [0]:
print(pages.count())
print(celebrities.count())

What is the most common length of the names in the WIKIPEDIA, ie, get a list of tuples in the format (length of names, count of articles having names of this length)?

In [0]:
# What is the most common length of the name
pages.map(lambda x: (len(x[0]), x[0])).groupByKey().map(lambda x: (x[0], len(x[1]))).takeOrdered(10, lambda x: -x[1])

In [0]:
# Comparison with reduceByKey
pages.map(lambda x: (len(x[0]), 1)).reduceByKey(lambda x, y: x + y).takeOrdered(10, lambda x: -x[1])

In [0]:
# What is the most common length of the names of the celebrities
celebrities.map(lambda x: (len(x[0]), 1)).reduceByKey(lambda x, y: x + y).takeOrdered(10, lambda x: -x[1])

Tag each celebrity with its profession, either as FILM DIRECTOR, COMPUTER SCIENTIST, POLITICIAN or OTHER

In [0]:
# Using RDDs
def tag_profession(celebrity):
    """
      Tag the profession of the celebrity by finding the profession name in the content.
    """
    if "film director" in celebrity[1]:
        profession = "FILM DIRECTOR"
    elif "computer scientist" in celebrity[1]:
        profession = "COMPUTER SCIENTIST"
    elif "politician" in celebrity[1]:
        profession = "POLITICIAN"
    else:
        profession = "OTHER"
        
    return (celebrity[0], profession)
        

celebs_with_profession_rdd = celebrities.map(tag_profession)

celebs_with_profession_rdd.collect()[:10]

How Dataframes can help?

In [0]:
# Convert the celebs RDD to DataFrame
celebsDF = celebrities.toDF(["title", "content"])

In [0]:
# Display the DataFrame
celebsDF.printSchema()
celebsDF.show(10)

In [0]:
# Create a new column as profession where a profession can be either FILM DIRECTOR, COMPUTER SCIENTIST or POLITICIAN
from pyspark.sql import functions as F

celebs_with_profession = celebsDF.withColumn("profession", 
                           F.when(F.col("content").contains("film director"), "FILM DIRECTOR")
                           .when(F.col("content").contains("computer scientist"), "COMPUTER SCIENTIST")
                           .when(F.col("content").contains("politician"), "POLITICIAN")
                           .when(F.col("content").contains("artist"), "ARTIST")
                           .otherwise("OTHER")).select("title", "profession")

In [0]:
celebs_with_profession.show(10)

In [0]:
celebs_with_profession.groupBy("profession").count().orderBy('count').show()

Find the celebs that are related to museums

In [0]:
celebs_in_museums = celebsDF.filter(F.col("content").contains("museum"))

celebs_in_museums.show(10)

Give a distribution of celebs featured in museum based on their profession, ie, count the number of celebs featured in museum by profession

In [0]:
celebs_in_museum_with_profession = celebs_in_museums.join(celebs_with_profession, celebs_in_museums.title == celebs_with_profession.title)\
                                                    .groupBy("profession").count().orderBy('count').show()

Get the significance of dates in a Wiki articles for a celebrity by counting the number of times a month name appears in the article divided by the total number of paragraphs.

In [0]:
# Using RDDs 

In [0]:
def date_significance(celebrity):
    """
        Calculates significance of dates for a celebrity by counting the number of times a month name appears in the article divided by the total number of paragraphs.
    """
    lst_of_mnths = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']
    month_cnt = [celebrity[1].count(month) for month in lst_of_mnths]
    
    return (celebrity[0], float(sum(month_cnt))/len(celebrity[1].split('\n\n')))
   

celebrities_with_date_signficance_rdd = celebrities.map(date_significance)

In [0]:
celebrities_with_date_signficance_rdd.filter(lambda x: x[1] > 1).count()

In [0]:
# Using DataFrames

In [0]:
from pyspark.sql.functions import udf

@udf("float")
def date_significance_udf(content):
    """
        Calculates significance of dates for a celebrity by counting the number of times a month name appears in the article divided by the total number of paragraphs.
    """
    lst_of_mnths = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']
    month_cnt = [content.count(month) for month in lst_of_mnths]
    
    return float(sum(month_cnt))/len(content.split('\n\n'))
  
celebrities_with_date_signficance_df = celebsDF.withColumn('date_significance', date_significance_udf(celebsDF.content)).select("title", "date_significance")

In [0]:
celebrities_with_date_signficance_df.filter(celebrities_with_date_signficance_df.date_significance > 1).count()

Calculate the average date significance by profession only for those cases where date signifance > 1

In [0]:
# Using RDDs : CASE 1

In [0]:
celebrities_with_date_signficance_rdd.join(celebs_with_profession_rdd).filter(lambda x: x[1][0] > 1).collect()

In [0]:
# Using RDDs : CASE 2

In [0]:
celebrities_with_date_signficance_rdd.filter(lambda x: x[1] > 1).join(celebs_with_profession_rdd).collect()

In [0]:
# Using DATAFRAMES: CASE 1

In [0]:
celebrities_with_date_signficance_df.join(celebs_with_profession, celebrities_with_date_signficance_df.title == celebs_with_profession.title).filter(celebrities_with_date_signficance_df.date_significance > 1).show()

In [0]:
# Using DATAFRAMES: CASE 2

In [0]:
celebrities_with_date_signficance_df.filter(celebrities_with_date_signficance_df.date_significance > 1).join(celebs_with_profession, celebrities_with_date_signficance_df.title == celebs_with_profession.title).show()

In [0]:
celebrities_with_date_signficance_df.cache()

In [0]:
celebrities_with_date_signficance_df.filter(celebrities_with_date_signficance_df.date_significance > 1).join(celebs_with_profession, celebrities_with_date_signficance_df.title == celebs_with_profession.title).show()

TODO: Find the average using groupBy() on profession

In [0]:
celebrities_with_date_signficance_df.filter(celebrities_with_date_signficance_df.date_significance == 0.0).count()