In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [2]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [6]:
my_schema = T.StructType([
        T.StructField("ip_address",T.StringType()),
        T.StructField("country",T.StringType()),
        T.StructField("domain_name",T.StringType()),
        T.StructField("bytes_used",T.IntegerType())
    ]
)

In [7]:
df=spark.read.csv("challenge.csv", header=True,schema=my_schema)
df.show()

+---------------+--------------+-----------------+----------+
|     ip_address|       country|      domain_name|bytes_used|
+---------------+--------------+-----------------+----------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|
| 119.239.207.13|         China|         youtu.be|        51|
|  68.69.217.210|         China|        adobe.com|        10|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|
|   211.13.10.68|     Indonesia|          hud.gov|        29|
|   239.80.21.97|      Suriname|       smh.com.au|       218|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|
| 127.242.24.138|         China| surveymonkey.com|       123|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|
|   237.54.11.63|         China|       amazon.com|        83|
| 252.141.157.25|         Japan|      cornell.edu|       374|
|185.220.128.248|       Belgium|       weebly.com|       389|
|   151.77.19.45|   Afghanistan|independent.co.uk|       282|
|  9.161

In [8]:
df.dtypes

[('ip_address', 'string'),
 ('country', 'string'),
 ('domain_name', 'string'),
 ('bytes_used', 'int')]

In [11]:
df = df.withColumn("is_Mexico",F.when(df.country.like("_exico"),"Yes").otherwise("No"))

In [12]:
df.groupBy(df.is_Mexico).agg(F.sum(df.bytes_used).alias("bytes_used_sum")).show()

+---------+--------------+
|is_Mexico|bytes_used_sum|
+---------+--------------+
|       No|        508076|
|      Yes|          6293|
+---------+--------------+



In [26]:
df.groupBy(df.country).agg(F.countDistinct(df.ip_address).alias("distinct_ips")).sort(F.col("distinct_ips").desc()).show()

+--------------+------------+
|       country|distinct_ips|
+--------------+------------+
|         China|         172|
|     Indonesia|         114|
|   Philippines|          65|
|        Russia|          56|
|        Brazil|          35|
|        Poland|          31|
|        Sweden|          28|
|         Japan|          25|
|Czech Republic|          23|
|      Portugal|          23|
|        France|          21|
|          Peru|          19|
|      Colombia|          17|
| United States|          15|
|     Argentina|          14|
|       Ukraine|          14|
|        Mexico|          13|
|      Thailand|          12|
|       Nigeria|          11|
|        Canada|          11|
+--------------+------------+
only showing top 20 rows



In [21]:
df.groupBy(df.domain_name).agg(F.countDistinct(df.domain_name).alias("count")).show()

+-----------------+-----+
|      domain_name|count|
+-----------------+-----+
|    xinhuanet.com|    1|
|   whitehouse.gov|    1|
|     illinois.edu|    1|
|     bluehost.com|    1|
|      blogger.com|    1|
|independent.co.uk|    1|
|      youtube.com|    1|
|       elpais.com|    1|
|    cafepress.com|    1|
|    artisteer.com|    1|
|         etsy.com|    1|
|   indiatimes.com|    1|
|       ebay.co.uk|    1|
|       disqus.com|    1|
|      samsung.com|    1|
|          tiny.cc|    1|
|deliciousdays.com|    1|
|             t.co|    1|
|         1und1.de|    1|
|    bigcartel.com|    1|
+-----------------+-----+
only showing top 20 rows

