### import pyspark  and dependencies

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Waiting for headers] [2 InRelease 0 B/3,626 B 0%] [Wa0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [3 InRelease 2,589 B/88.7 k                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [2 InRelease gpg

### Construct schema for the dataframe

In [2]:
from pyspark.sql.types import *
schema = StructType([
  StructField('ip_address', StringType()),
  StructField('Country', StringType()),
  StructField('domain_name', StringType()),
  StructField('bytes_used', IntegerType()),
])

df = spark.read.csv("challenge.csv", header=True, schema=schema)
df.show()

+---------------+--------------+-----------------+----------+
|     ip_address|       Country|      domain_name|bytes_used|
+---------------+--------------+-----------------+----------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|
| 119.239.207.13|         China|         youtu.be|        51|
|  68.69.217.210|         China|        adobe.com|        10|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|
|   211.13.10.68|     Indonesia|          hud.gov|        29|
|   239.80.21.97|      Suriname|       smh.com.au|       218|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|
| 127.242.24.138|         China| surveymonkey.com|       123|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|
|   237.54.11.63|         China|       amazon.com|        83|
| 252.141.157.25|         Japan|      cornell.edu|       374|
|185.220.128.248|       Belgium|       weebly.com|       389|
|   151.77.19.45|   Afghanistan|independent.co.uk|       282|
|  9.161

### create column to show if a country is Mexico or not

In [7]:
from pyspark.sql.functions import *
df = df.withColumn('mexico', when(df.Country == 'Mexico', 'Yes').otherwise('no'))
df.show()

+---------------+--------------+-----------------+----------+------+
|     ip_address|       Country|      domain_name|bytes_used|mexico|
+---------------+--------------+-----------------+----------+------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|    no|
| 119.239.207.13|         China|         youtu.be|        51|    no|
|  68.69.217.210|         China|        adobe.com|        10|    no|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|    no|
|   211.13.10.68|     Indonesia|          hud.gov|        29|    no|
|   239.80.21.97|      Suriname|       smh.com.au|       218|    no|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|    no|
| 127.242.24.138|         China| surveymonkey.com|       123|    no|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|    no|
|   237.54.11.63|         China|       amazon.com|        83|    no|
| 252.141.157.25|         Japan|      cornell.edu|       374|    no|
|185.220.128.248|       Belgium|  

### take an aggregate of the internet bytes used in Mexico  

In [8]:
import pyspark.sql.functions as sqlfunc
df1 = df.groupBy('mexico').agg(sqlfunc.sum('bytes_used'))
df1.show()

+------+---------------+
|mexico|sum(bytes_used)|
+------+---------------+
|   Yes|           6293|
|    no|         508076|
+------+---------------+



### Count distinct number of ip addresses for each country

In [10]:
import pyspark.sql.functions as sqlfunc
df1 = df.groupBy('Country').agg(sqlfunc.countDistinct('ip_address').alias('sum of ips'))
df1.show()

+-----------+----------+
|    Country|sum of ips|
+-----------+----------+
|       Chad|         1|
|     Russia|        56|
|   Paraguay|         1|
|      Yemen|         1|
|     Sweden|        28|
|Philippines|        65|
|   Malaysia|         5|
|     Turkey|         1|
|     Malawi|         2|
|    Germany|         5|
|    Comoros|         1|
|Afghanistan|         5|
|     Rwanda|         1|
|      Sudan|         1|
|     France|        21|
|     Greece|         8|
|  Sri Lanka|         3|
|   Dominica|         1|
|  Argentina|        14|
|    Belgium|         1|
+-----------+----------+
only showing top 20 rows

