### Q1. How do we create RDDs in Spark?

RDDs reprensents Resilient Distributed Datasets. They are resilient because they have the ability to always recompute an RDD. They are distributed collections of objects that are computed on different nodes over the cluster.

There are mainly three ways to create an RDD in Spark.
- Parallelizing already existing collections in driver program, for example converting a list to RDD, which is already created in a driver program.
- Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file systems or data source offering a Hadoop Input Format).
- Creating RDD from already existing RDDs.

### Q2. What are two major categories of transformations on RDDs that Spark provides?

There are two types of operations on RDD: “transformation” and “action”. Transformations on RDD are lazy in nature, which means that computations on RDD are not done until we apply an action.

- Transformation: refers to the operation applied on a RDD to create new RDD. Filter, groupBy and map are examples of transformations.

- Actions: Actions refer to an operation which also applies on RDD, that instructs Spark to perform computation and send the result back to driver.

### Q3.

In [85]:
df_game = spark.read.option("header",True).csv("data/game.csv")
df_game_teams_stats = spark.read.option("header",True).csv("data/game_teams_stats.csv")
df_team_info = spark.read.option("header",True).csv("data/team_info.csv")

1. Select two columns - games and seasons - and add a column with total goals (sum of home and away goals). Suggestion: use df.withColumn() function –

In [86]:
df_game_with_total_goals = df_game.select(['game_id', 'season', 'away_goals', 'home_goals'])
df_game_with_total_goals = df_game_with_total_goals\
.withColumn('total_goals', df_game_with_total_goals.away_goals+\
            df_game_with_total_goals.home_goals)\
.select('game_id', 'season', 'total_goals')

2. Organize records in ascending order (by season)

In [87]:
df_game_with_total_goals = df_game_with_total_goals\
.orderBy(["season", "total_goals"], ascending=True)

3. Add a column with an average, min and max total score for each season. Suggestion: use Window function

In [91]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType as string
from pyspark.sql import Window
window = Window.partitionBy("season")

df_game_with_total_goals = df_game_with_total_goals\
.withColumn('avg_season_goals', \
            F.bround(F.avg('total_goals').over(window),2))\
.withColumn('min_season_goals', \
            F.min('total_goals').over(window))\
.withColumn('max_season_goals', \
            F.max('total_goals').over(window))\
.orderBy(["season", "total_goals"], ascending=True)

4. Add a column that finds a difference between each game’s total score and average for that season. Suggestion: use Window function

In [98]:
df_game_with_total_goals = df_game_with_total_goals\
.withColumn('diff_goals_to_avg_season_goals', \
            F.bround(df_game_with_total_goals.total_goals - \
                     F.avg('total_goals').over(window),2))\
.orderBy(["season", "total_goals"], ascending=True)

5. Print top 10 records

In [99]:
df_game_with_total_goals.show(10)

+----------+--------+-----------+----------------+----------------+----------------+------------------------------+
|   game_id|  season|total_goals|avg_season_goals|min_season_goals|max_season_goals|diff_goals_to_avg_season_goals|
+----------+--------+-----------+----------------+----------------+----------------+------------------------------+
|2010020966|20102011|        1.0|            5.59|             1.0|            15.0|                         -4.59|
|2010030121|20102011|        1.0|            5.59|             1.0|            15.0|                         -4.59|
|2010020761|20102011|        1.0|            5.59|             1.0|            15.0|                         -4.59|
|2010030411|20102011|        1.0|            5.59|             1.0|            15.0|                         -4.59|
|2010030147|20102011|        1.0|            5.59|             1.0|            15.0|                         -4.59|
|2010030124|20102011|        1.0|            5.59|             1.0|     

### Q4.

1. List all team names (teamName) for teams that played as away team at TD Garden during seasons 2012-2013 and 2013-2014

In [133]:
df_away_teams_season_12_and_13_at_td_garden = df_game.select('season', 'away_team_id', 'venue')\
.join(df_team_info.select('team_id','teamName'), \
      df_game.away_team_id == df_team_info.team_id, \
      how='left')\
.drop('team_id')\
.filter((F.col("venue") == 'TD Garden') &\
        ((F.col("season") == 20122013) |\
         (F.col("season") == 20132014)))
df_away_teams_season_12_and_13_at_td_garden\
.select('teamName').rdd.flatMap(lambda x: x).collect()

['Rangers',
 'Rangers',
 'Rangers',
 'Penguins',
 'Penguins',
 'Maple Leafs',
 'Maple Leafs',
 'Maple Leafs',
 'Maple Leafs',
 'Blackhawks',
 'Blackhawks',
 'Blackhawks',
 'Red Wings',
 'Red Wings',
 'Red Wings',
 'Canadiens',
 'Canadiens',
 'Canadiens',
 'Canadiens',
 'Hurricanes',
 'Red Wings',
 'Panthers',
 'Panthers',
 'Panthers',
 'Islanders',
 'Senators',
 'Lightning',
 'Capitals',
 'Sabres',
 'Senators',
 'Flyers',
 'Sabres',
 'Flames',
 'Ducks',
 'Panthers',
 'Islanders',
 'Coyotes',
 'Panthers',
 'Blue Jackets',
 'Sharks',
 'Lightning',
 'Rangers',
 'Blues',
 'Hurricanes',
 'Canadiens',
 'Predators',
 'Capitals',
 'Senators',
 'Hurricanes',
 'Penguins',
 'Islanders',
 'Jets',
 'Lightning',
 'Wild',
 'Kings',
 'Red Wings',
 'Maple Leafs',
 'Jets',
 'Capitals',
 'Devils',
 'Canadiens',
 'Blackhawks',
 'Avalanche',
 'Penguins',
 'Rangers',
 'Maple Leafs',
 'Maple Leafs',
 'Senators',
 'Flyers',
 'Rangers',
 'Devils',
 'Blue Jackets',
 'Canadiens',
 'Senators',
 'Penguins',
 'Sabr

2. How many unique teams are on the list?

In [118]:
len(df_away_teams_season_12_and_13_at_td_garden\
.select('teamName').drop_duplicates().collect())

29

### Q5. 

Create a data frame that have three columns: Student, Subject, Score. Student name includes: Demar and Kawhi. Subject includes math, history, science. Score can be any number between 0 and 100. The result should be a dataframe with 6 rows and 3 columns.

In [189]:
df_student_subject_score = sqlContext.createDataFrame(
    [('Demar', 'math'),\
     ('Demar', 'history'),\
     ('Demar', 'science'),\
     ('Kawhi', 'math'),\
     ('Kawhi', 'history'),\
     ('Kawhi', 'science')], ("Student", "Subject"))

In [190]:
df_student_subject_score = df_student_subject_score\
.withColumn('Score', F.bround(F.rand()*100, 0))

Use user define function to add a category column to the data frame, if score higher than 70, category show ‘Good’, if score higher than 50 but lower than 70, category should show ’Ok’, if score lower than 50 then show ‘Not good’.

In [194]:
def create_score_category(score):
    if score > 70:
        return "Good"
    elif score < 50:
        return "Not good"
    else:
        return "Ok"

spark_udf = F.udf(create_score_category, StringType())

df_student_subject_score = df_student_subject_score\
.withColumn('Score_Category', spark_udf('Score'))

In [195]:
df_student_subject_score.show()

+-------+-------+-----+--------------+
|Student|Subject|Score|Score_Category|
+-------+-------+-----+--------------+
|  Demar|   math|  6.0|      Not good|
|  Demar|history| 26.0|      Not good|
|  Demar|science| 58.0|            Ok|
|  Kawhi|   math|  1.0|      Not good|
|  Kawhi|history| 58.0|            Ok|
|  Kawhi|science| 96.0|          Good|
+-------+-------+-----+--------------+



### Q6.

Create a function that when input a number n returns a list of prime numbers between 1 and n.
Test your function with number 17.

In [216]:
def is_it_prime(number):
    # make sure n is a positive integer
    number = abs(int(number))
    # simple tests
    if number < 2:
        return False
    # 2 is prime
    if number == 2:
        return True
    # other even numbers aren't
    if not number & 1:
        return False
    # check whether number is divisible into it's square root
    for x in range(3, int(number**0.5)+1, 2):
        if number % x == 0:
            return False
    #if we get this far we are good
    return True

from six.moves import xrange
def get_prime_numbers_from_1_to_n(n):
    # create a set of numbers to 100,000
    numbers = sc.parallelize(xrange(n))
    print(numbers.filter(is_it_prime).collect())

In [217]:
get_prime_numbers_from_1_to_n(17)

[2, 3, 5, 7, 11, 13]
