In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql.types import StructType,StructField, StringType
from pyspark.sql import Row
from helpers.helper_functions import *
import matplotlib.pyplot as plt

# create the session
spark = SparkSession.builder.getOrCreate()

# create the context
sc = spark.sparkContext

# Load dataset

In [2]:
Bombing_Operations = spark.read.json("data_templates/Bombing_Operations.json.gz")
Aircraft_Glossary = spark.read.json("data_templates/Aircraft_Glossary.json.gz")

# Groupping

In [3]:
missions_countries = Bombing_Operations.selectExpr(["to_date(MissionDate) as MissionDate", "ContryFlyingMission"])
missions_by_date = missions_countries\
                    .groupBy(["MissionDate", "ContryFlyingMission"])\
                    .agg(count("*").alias("MissionsCount"))\
                    .sort(asc("MissionDate")).toPandas()
missions_by_date.head()

Unnamed: 0,MissionDate,ContryFlyingMission,MissionsCount
0,1965-10-01,UNITED STATES OF AMERICA,447
1,1965-10-02,UNITED STATES OF AMERICA,652
2,1965-10-03,UNITED STATES OF AMERICA,608
3,1965-10-04,UNITED STATES OF AMERICA,532
4,1965-10-05,UNITED STATES OF AMERICA,697


# Iterating

In [None]:
for country, missions in missions_by_date.groupby("ContryFlyingMission"): 
    plt.plot(missions["MissionDate"], missions["MissionsCount"], label=country)

# Map/Reduce

In [None]:
all_locations = jun_29_operations.rdd.map(lambda row: (row.TakeoffLocation, 1))
all_locations.take(3)
locations_counts_rdd = all_locations.reduceByKey(lambda a, b: a+b).sortBy(lambda r: -r[1])
locations_counts_rdd.take(3)

# Convert RDD to df

In [None]:
locations_counts_with_schema = locations_counts_rdd.map(lambda r: Row(TakeoffLocation=r[0], MissionsCount=r[1]))
locations_counts = spark.createDataFrame(locations_counts_with_schema)
locations_counts.show()

# Tokenizer, stopwords_remover

## Explode words from list of words

In [None]:
# get all words in a single dataframe
all_words = reddit_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k 
top50k = all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)

top50k.show()

## Create a representation of subreddits and number of 50k words in this subreddit: 1 sureddit - all word in it 

In [None]:
subreddit_50k = filtered_tokens.rdd.map(lambda r: (r.subreddit, [r.word])).reduceByKey(lambda a,b: a+b).collect()

## Jackard similarity heatmap

In [None]:
def jaccard_similarity(list1, list2):
    s1 = set(list1)
    s2 = set(list2)
    return len(s1.intersection(s2)) / len(s1.union(s2))

In [None]:
# Note: similarity is computed 2 times! It can be optimized
similarity = []
for sr1 in subreddit_50k:
    for sr2 in subreddit_50k:
        similarity.append((sr1[0], sr2[0], jaccard_similarity(sr1[1], sr2[1])))


similarity_matrix_50k_words = pd.DataFrame(similarity).pivot(index=0, columns=1, values=2)
plot_heatmap(similarity_matrix_50k_words)

# vectorized bag of words representation for each dialogue line

In [17]:
df_rdd=sc.textFile('data_templates/all_scripts.txt')

df_rdd=df_rdd.flatMap(lambda line: line.split('\n'))

df_rdd=df_rdd.filter(lambda l: not (l.startswith('>'))).map(\
                        lambda line: [part.strip(' ') for part in line.split(':', 1)])

df_rdd=df_rdd.map(lambda l: [l[0], ''.join([\
                        char if char not in EXCLUDE_CHARS else ' ' for char in l[1]]).lower()])

df_rdd=df_rdd.map(lambda l: [l[0],[word.strip(' ') for word in l[1].split(' ')\
                        if word !=' ' and word !='' and len(word)>1]])

In [29]:
df_lines= spark.createDataFrame(df_rdd,('character','BoW'))
df_lines.show()

+------------+--------------------+
|   character|                 BoW|
+------------+--------------------+
|     Sheldon|[so, if, photon, ...|
|     Leonard|[agreed, what, yo...|
|     Sheldon|[there, no, point...|
|     Leonard|        [excuse, me]|
|Receptionist|          [hang, on]|
|     Leonard|[one, across, is,...|
|Receptionist|    [can, help, you]|
|     Leonard|[yes, um, is, thi...|
|Receptionist|[if, you, have, t...|
|     Sheldon|[think, this, is,...|
|Receptionist|  [fill, these, out]|
|     Leonard|[thank, you, we, ...|
|Receptionist|[oh, take, your, ...|
|     Sheldon|[leonard, don, th...|
|     Leonard|[what, are, you, ...|
|     Sheldon|[no, we, are, com...|
|     Leonard|[sheldon, this, w...|
|     Sheldon|[know, and, do, y...|
|     Leonard|[sure, she, ll, s...|
|     Sheldon|            [wouldn]|
+------------+--------------------+
only showing top 20 rows



# Bag of words for each person

In [22]:
df_rdd_=df_rdd.flatMap(lambda l : [((l[0],word), 1) for word in l[1]]).reduceByKey(lambda a,b: a+b)
df_rdd_=df_rdd_.map(lambda t: (t[0][0],[(t[0][1], t[1])])).reduceByKey(lambda a,b: a+b)
df= spark.createDataFrame(df_rdd_,('character','BoW'))
df.show()

+------------------+--------------------+
|         character|                 BoW|
+------------------+--------------------+
|           Sheldon|[[photon, 1], [is...|
|           Leonard|[[point, 43], [ex...|
|            Howard|[[till, 3], [this...|
|             Voice|[[lost, 1], [was,...|
|               Man|[[in, 8], [am, 3]...|
|            Lesley|[[leonard, 7], [h...|
|    Howard’s phone|[[say, 2], [call,...|
|            Waiter|[[oh, 1], [where,...|
|          Together|[[yes, 1], [at, 1...|
|              Toby|[[was, 3], [more,...|
|            Leslie|[[leonard, 10], [...|
|             Missy|[[nice, 2], [meet...|
|             Woman|[[have, 1], [own,...|
|             Nurse|[[fill, 3], [this...|
|       Gablehauser|[[am, 1], [very, ...|
|           Warrior|[[again, 1], [leo...|
|Leonard and Howard|[[don, 4], [oh, 1...|
|          DMV Lady|[[take, 2], [this...|
|             Steph|[[maybe, 2], [we,...|
|            Kripke|[[hofstadter, 4],...|
+------------------+--------------

# Spark groupby

In [31]:
Scaled_pokemons=spark.read.load('data_templates/pokemon.csv',format='csv',header='true')
Scaled_combats=spark.read.load('data_templates/combats.csv',format='csv',header='true')

Scaled_pokemons.printSchema()
Scaled_combats.printSchema()

root
 |-- pid: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Class 1: string (nullable = true)
 |-- Class 2: string (nullable = true)
 |-- HP: string (nullable = true)
 |-- Attack: string (nullable = true)
 |-- Defense: string (nullable = true)
 |-- Sp. Atk: string (nullable = true)
 |-- Sp. Def: string (nullable = true)
 |-- Speed: string (nullable = true)
 |-- Legendary: string (nullable = true)

root
 |-- First_pokemon: string (nullable = true)
 |-- Second_pokemon: string (nullable = true)
 |-- Winner: string (nullable = true)



In [32]:
#Aggregate number of victories, sort by number and take top 10
Victory_numbers=Scaled_combats.groupby('Winner').agg(count('*')).sort(desc('count(1)')).limit(10)

#Join to datasets to get the names
Pokemons_names_victories=Victory_numbers.join(Scaled_pokemons,Scaled_pokemons.pid==Victory_numbers.Winner)

Pokemons_names_victories.show()

+------+--------+---+------------------+-------+--------+---+------+-------+-------+-------+-----+---------+
|Winner|count(1)|pid|              Name|Class 1| Class 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Legendary|
+------+--------+---+------------------+-------+--------+---+------+-------+-------+-------+-----+---------+
|   154|     136|154|        Aerodactyl|   Rock|  Flying| 80|   105|     65|     60|     75|  130|    FALSE|
|   155|     127|155|   Mega Aerodactyl|   Rock|  Flying| 80|   135|     85|     70|     95|  150|    FALSE|
|   163|     152|163|            Mewtwo|Psychic|    null|106|   110|     90|    154|     90|  130|     TRUE|
|   214|     130|214|           Murkrow|   Dark|  Flying| 60|    85|     42|     85|     42|   91|    FALSE|
|   249|     128|249|     Mega Houndoom|   Dark|    Fire| 75|    90|     90|    140|     90|  115|    FALSE|
|   314|     133|314|           Slaking| Normal|    null|150|   160|    100|     95|     65|  100|    FALSE|
|   394|     130|39

In [34]:
Names=Pokemons_names_victories.sort(desc('count(1)')).select('Name').collect()
Names_list=[name.Name for name in Names]
print(Names_list)

['Mewtwo', 'Aerodactyl', 'Infernape', 'Jirachi', 'Slaking', 'Deoxys Speed Forme', 'Murkrow', 'Mega Absol', 'Mega Houndoom', 'Mega Aerodactyl']
