In [1]:
import os
import pyspark
conf = pyspark.SparkConf()

conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041')
conf.set('spark.sql.repl.eagerEval.enabled', True)
conf.set('spark.driver.memory','4g')
sc = pyspark.SparkContext(conf=conf)

spark = pyspark.SQLContext.getOrCreate(sc)
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import regexp_replace, trim, col, lower,date_format,split,explode,desc,hour
from pyspark.sql.types import StructType,StructField, StringType,DoubleType
from pyspark.sql import Window

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/20 22:47:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Question 1. 

In [2]:
bakery = spark.read.option("inferSchema",True).option("header",True).csv("Bakery.csv")

                                                                                

In [3]:
bakery = bakery.withColumn("time", date_format('DateTime', 'HH:mm:ss'))
bakery = bakery.withColumn("date", date_format('DateTime', 'yyyy-MM-dd'))
bakery = bakery.withColumn("hour", hour(F.to_timestamp("time","HH:mm:ss")))
bakery.groupBy("Items","date","hour").count().orderBy("date")

                                                                                

Items,date,hour,count
Coke,2016-01-11,13,1
Medialuna,2016-01-11,16,1
Cookies,2016-01-11,15,1
Bread,2016-01-11,15,1
Jam,2016-01-11,14,1
Tea,2016-01-11,15,2
Eggs,2016-01-11,16,1
Tea,2016-01-11,8,3
Victorian Sponge,2016-01-11,9,1
Tea,2016-01-11,11,1


### Question 2.

In [9]:
bakery2 = bakery.groupBy("Items","DayType", "Daypart").count()

In [10]:
column_list = ["Items","DayType","Daypart"]
window = Window.partitionBy(bakery2['DayType'],bakery2['Daypart']).orderBy(bakery2['count'].desc())

In [11]:
bakery2 = bakery2.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 3)

In [12]:
bakery2 =bakery2.groupBy("DayType","Daypart").agg(F.collect_list("Items"))
bakery2.show(truncate=False)

                                                                                

+-------+---------+------------------------------------------+
|DayType|Daypart  |collect_list(Items)                       |
+-------+---------+------------------------------------------+
|Weekday|Afternoon|[Coffee, Bread, Tea]                      |
|Weekday|Evening  |[Coffee, Bread, Tea]                      |
|Weekday|Morning  |[Coffee, Bread, Pastry]                   |
|Weekday|Night    |[Valentine's card, Juice, Mineral water]  |
|Weekend|Afternoon|[Coffee, Bread, Tea]                      |
|Weekend|Evening  |[Tshirt, Coffee, Afternoon with the baker]|
|Weekend|Morning  |[Coffee, Bread, Pastry]                   |
|Weekend|Night    |[Vegan Feast, Hot chocolate, Scandinavian]|
+-------+---------+------------------------------------------+



### Question 3.

In [13]:
restaurants = spark.read.option("inferSchema",True).option("header",True).option("delimiter",";").csv("Restaurants_in_Durham_County_NC.csv")

In [15]:
restaurants.groupBy("Rpt_Area_Desc").count().show()

+--------------------+-----+
|       Rpt_Area_Desc|count|
+--------------------+-----+
|  Bed&Breakfast Home|    4|
|        Summer Camps|    4|
|        Institutions|   30|
|   Local Confinement|    2|
|         Mobile Food|  147|
|    School Buildings|   89|
|                null|   13|
|         Summer Food|  242|
|      Swimming Pools|  420|
|            Day Care|  173|
|Tattoo Establishm...|   32|
|    Residential Care|  154|
|   Bed&Breakfast Inn|    2|
|      Adult Day Care|    5|
|             Lodging|   62|
|        Food Service| 1093|
+--------------------+-----+



### Question 4.

In [16]:
population = spark.read.option("inferSchema",True).option("header",True).csv("populationbycountry19802010millions.csv")
population = population.withColumnRenamed("_c0","Country")

In [17]:
population1 = population.select(
    'Country',
    *[F.round((((F.col(str(year)) - F.col(str(int(year)-1))) / F.col(str(int(year)-1)))*100),3).alias(year)
      for year in population.columns[2:]]
)

In [18]:
pivot_df = population1.groupby(population1.columns[1:]) \
                        .pivot('Country') \
                        .max() 

22/03/20 22:50:42 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 
 Schema: _c0
Expected: _c0 but found: 
CSV file: file:///home/jovyan/populationbycountry19802010millions.csv


In [19]:
schema = StructType([
  StructField('year', StringType(), True),
  StructField('Country', StringType(), True),
  StructField('Max increase', StringType(), True)
  ])
df = spark.createDataFrame([],schema)

In [20]:
for year in population1.columns[1:]:
    df1 = population1.select("Country",year).orderBy(col(year).desc()).limit(1)
    df1
    df_row = spark.createDataFrame([(year,df1.collect()[0][0],df1.collect()[0][1])], schema)
    df = df.union(df_row)

22/03/20 22:51:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , 1980, 1981
 Schema: _c0, 1980, 1981
Expected: _c0 but found: 
CSV file: file:///home/jovyan/populationbycountry19802010millions.csv
22/03/20 22:51:02 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , 1980, 1981
 Schema: _c0, 1980, 1981
Expected: _c0 but found: 
CSV file: file:///home/jovyan/populationbycountry19802010millions.csv
22/03/20 22:51:02 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , 1981, 1982
 Schema: _c0, 1981, 1982
Expected: _c0 but found: 
CSV file: file:///home/jovyan/populationbycountry19802010millions.csv
22/03/20 22:51:02 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , 1981, 1982
 Schema: _c0, 1981, 1982
Expected: _c0 but found: 
CSV file: file:///home/jovyan/populationbycountry19802010millions.csv
22/03/20 22:51:03 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header:

In [21]:
df.show()

                                                                                

+----+--------------------+------------+
|year|             Country|Max increase|
+----+--------------------+------------+
|1981|      Western Sahara|      12.133|
|1982|      Western Sahara|      11.115|
|1983|       French Guiana|      14.286|
|1984|               Qatar|      10.964|
|1985|       French Guiana|        12.5|
|1986|               Qatar|       8.772|
|1987|       French Guiana|      11.111|
|1988|      Cayman Islands|       11.01|
|1989|United Arab Emirates|        6.12|
|1990|            Djibouti|      12.824|
|1991|              Jordan|      11.274|
|1992|              Kuwait|      48.633|
|1993|         Afghanistan|      13.225|
|1994|         Afghanistan|       8.728|
|1995|             Burundi|       7.222|
|1996|              Rwanda|      19.614|
|1997|Falkland Islands ...|        21.5|
|1998|             Liberia|      12.017|
|1999|Falkland Islands ...|       7.692|
|2000|          Montserrat|      16.864|
+----+--------------------+------------+
only showing top

### Question 5.

In [23]:
def alphanumeric(column):
    return trim(lower(regexp_replace(column, '([^0-9a-zA-Z])+', ' '))).alias('sentence')
file_path = 'internet_archive_scifi_v3.txt'
text_file = spark.read.option("lineSep", ".").text(file_path).select(alphanumeric(col('value')))

In [24]:
text_file1 = (text_file
                    .select(split(text_file.sentence, '\s+').alias('split')))
text_file2 = (text_file1
                    .select(explode(text_file1.split).alias('word')))

In [25]:
def wordCount(file):
     return (file
                .groupBy('word').count())
word_count = wordCount(text_file2)
topwords = word_count.orderBy("word")

topwords.show()

                                                                                

+------+-----+
|  word|count|
+------+-----+
|      |91509|
|     0|   10|
|    00|   52|
|   000|   40|
| 000th|    1|
|    03|    1|
|    05|    1|
|   060|    1|
|    07|    1|
|     1|  150|
|    10|   29|
|   100|    5|
|  1000|    4|
|1000th|    1|
|   101|    5|
|   102|    5|
|   103|    4|
|   104|    5|
|   105|    6|
|   106|    3|
+------+-----+
only showing top 20 rows



### Question 6.

In [27]:
import re
def alphanumeric(text):
    text = re.sub("[^0-9a-zA-Z ]", " ", text)
    return text

rdd = sc.textFile("internet_archive_scifi_v3.txt")
row = rdd.flatMap(lambda x : x.split(". "))
row = row.map(alphanumeric).map(lambda x : x.lower())

def bigram(words):
    bigrams = []
    for i in range(len(words) - 1):
        bigrams.append((tuple(words[i:i+2]), 1))
    return bigrams

bigrams = row.map(lambda s : s.split()).flatMap(bigram)
freq_bigrams = bigrams.reduceByKey(lambda x, y: x + y).map(lambda x : (x[1], x[0])).sortByKey(False)

                                                                                

In [28]:
topcount = freq_bigrams.collect()

                                                                                

In [29]:
topcount[:10]

[(157766, ('of', 'the')),
 (101616, ('in', 'the')),
 (69666, ('to', 'the')),
 (57452, ('on', 'the')),
 (56108, ('it', 'was')),
 (46022, ('and', 'the')),
 (44147, ('don', 't')),
 (43626, ('at', 'the')),
 (39428, ('to', 'be')),
 (38236, ('he', 'was'))]

### Question 7.

In [30]:
durham = spark.read.json("durham-nc-foreclosure-2006-2016.json")

In [31]:
durham = durham.filter(durham.fields.geocode.isNotNull())

In [32]:
restaurants1 = restaurants.filter((restaurants.Status == "ACTIVE") & (restaurants.Rpt_Area_Desc == "Food Service") & (restaurants.geolocation.isNotNull()))

In [33]:
durham1 = durham.join(restaurants1)

In [34]:
import haversine
def findDistance(geoClosure,geoRestaurant):
    lat1,long1 = geoClosure
    lat2,long2 = geoRestaurant.split(",")
    lat2 = lat2.strip()
    long2.strip()
    distance = haversine.haversine((float(lat1),float(long1)),(float(lat2),float(long2)))
    return distance
UDF = udf(findDistance)  
durham2 = durham1.withColumn("distance", F.round(UDF(col("fields.geocode"),col("geolocation")),2).cast(DoubleType()))

In [35]:
durham3 = durham2.where(durham2.distance<=1)

In [36]:
durham3.groupBy("ID").count().show()

22/03/20 23:05:36 WARN ExtractPythonUDFFromJoinCondition: The join condition:(round(cast(findDistance(_extract_geocode#43124, geolocation#241) as double), 2) <= 1.0) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
[Stage 105:>                                                        (0 + 1) / 1]

+------+-----+
|    ID|count|
+------+-----+
| 58358|    2|
|147244|   28|
| 57180|   32|
| 57655|    3|
| 58318|    4|
| 56081|   16|
|155004|   21|
| 99093|  191|
|180224|    1|
| 57048|   17|
| 76848|   14|
| 56656|    1|
| 56926|    1|
|148160|    1|
| 56685|   23|
| 80693|   12|
| 56171|   17|
| 57164|    3|
|170723|    6|
| 56849|   26|
+------+-----+
only showing top 20 rows



                                                                                