In [99]:
import os 
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['SPARK_HOME'] = "./spark-2.3.1-bin-hadoop2.7"

In [100]:
import findspark
findspark.init()

In [101]:
import pyspark
import pyspark.sql.functions as sqlfunc
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
spark

In [102]:
myschema = StructType([
    StructField('ip_address', StringType()),
    StructField('Country', StringType()),
    StructField('Domain Name', StringType()),
    StructField('Bytes_used', IntegerType())
])
df = spark.read.csv("challenge.csv", header=True, schema=myschema)
df = df.withColumnRenamed('Domain Name', 'Domain_Name')
df.show()

+---------------+--------------+-----------------+----------+
|     ip_address|       Country|      Domain_Name|Bytes_used|
+---------------+--------------+-----------------+----------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|
| 119.239.207.13|         China|         youtu.be|        51|
|  68.69.217.210|         China|        adobe.com|        10|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|
|   211.13.10.68|     Indonesia|          hud.gov|        29|
|   239.80.21.97|      Suriname|       smh.com.au|       218|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|
| 127.242.24.138|         China| surveymonkey.com|       123|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|
|   237.54.11.63|         China|       amazon.com|        83|
| 252.141.157.25|         Japan|      cornell.edu|       374|
|185.220.128.248|       Belgium|       weebly.com|       389|
|   151.77.19.45|   Afghanistan|independent.co.uk|       282|
|  9.161

In [103]:
df.dtypes

[('ip_address', 'string'),
 ('Country', 'string'),
 ('Domain_Name', 'string'),
 ('Bytes_used', 'int')]

In [104]:
df.describe().show()

+-------+------------+-----------+-------------+------------------+
|summary|  ip_address|    Country|  Domain_Name|        Bytes_used|
+-------+------------+-----------+-------------+------------------+
|  count|        1000|       1000|         1000|              1000|
|   mean|        null|       null|         null|           514.369|
| stddev|        null|       null|         null|287.66708047357156|
|    min|0.23.109.109|Afghanistan|123-reg.co.uk|                 1|
|    max|99.73.193.76|     Zambia|   zimbio.com|              1000|
+-------+------------+-----------+-------------+------------------+



In [105]:
df.columns

['ip_address', 'Country', 'Domain_Name', 'Bytes_used']

In [106]:
df.count()

1000

In [107]:
df.distinct().count()

1000

In [108]:
df_dropped = df.na.drop()
df_dropped.show()

+---------------+--------------+-----------------+----------+
|     ip_address|       Country|      Domain_Name|Bytes_used|
+---------------+--------------+-----------------+----------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|
| 119.239.207.13|         China|         youtu.be|        51|
|  68.69.217.210|         China|        adobe.com|        10|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|
|   211.13.10.68|     Indonesia|          hud.gov|        29|
|   239.80.21.97|      Suriname|       smh.com.au|       218|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|
| 127.242.24.138|         China| surveymonkey.com|       123|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|
|   237.54.11.63|         China|       amazon.com|        83|
| 252.141.157.25|         Japan|      cornell.edu|       374|
|185.220.128.248|       Belgium|       weebly.com|       389|
|   151.77.19.45|   Afghanistan|independent.co.uk|       282|
|  9.161

In [109]:
df_null_ip_address = df.filter(df.ip_address.isNull())
df_null_ip_address.show()

+----------+-------+-----------+----------+
|ip_address|Country|Domain_Name|Bytes_used|
+----------+-------+-----------+----------+
+----------+-------+-----------+----------+



In [110]:
df_no_duplicates = df.dropDuplicates()
df_no_duplicates.count()

1000

In [111]:
df_selected = df.select("Country", "Bytes_used")
df_selected.show()

+--------------+----------+
|       Country|Bytes_used|
+--------------+----------+
|         China|       463|
|         China|        51|
|         China|        10|
|      Bulgaria|       853|
|     Indonesia|        29|
|      Suriname|       218|
|       Jamaica|        95|
|         China|       123|
|Czech Republic|       322|
|         China|        83|
|         Japan|       374|
|       Belgium|       389|
|   Afghanistan|       282|
|     Indonesia|       726|
|Czech Republic|       657|
|     Indonesia|       517|
|        Mexico|       877|
|       Croatia|       287|
|      Thailand|       588|
|      Thailand|       114|
+--------------+----------+
only showing top 20 rows



In [112]:
df_selected = df_selected.withColumnRenamed('Bytes_used', 'Bytes')
df_selected.show()

+--------------+-----+
|       Country|Bytes|
+--------------+-----+
|         China|  463|
|         China|   51|
|         China|   10|
|      Bulgaria|  853|
|     Indonesia|   29|
|      Suriname|  218|
|       Jamaica|   95|
|         China|  123|
|Czech Republic|  322|
|         China|   83|
|         Japan|  374|
|       Belgium|  389|
|   Afghanistan|  282|
|     Indonesia|  726|
|Czech Republic|  657|
|     Indonesia|  517|
|        Mexico|  877|
|       Croatia|  287|
|      Thailand|  588|
|      Thailand|  114|
+--------------+-----+
only showing top 20 rows



In [113]:
df_filtered = df.filter(df.Country == "Thailand")
df_filtered.show()

+---------------+--------+-------------------+----------+
|     ip_address| Country|        Domain_Name|Bytes_used|
+---------------+--------+-------------------+----------+
|    81.71.28.97|Thailand|            last.fm|       588|
|  9.255.129.184|Thailand|            mtv.com|       114|
|   33.93.113.70|Thailand|          goo.ne.jp|       527|
|   111.9.45.159|Thailand|cargocollective.com|       974|
|   38.121.13.55|Thailand|        yahoo.co.jp|        48|
| 236.194.113.84|Thailand|         eepurl.com|       108|
| 206.27.230.110|Thailand|          globo.com|        54|
|   9.119.201.93|Thailand|             cbc.ca|       755|
|149.212.121.228|Thailand|       freewebs.com|       936|
|  156.15.114.21|Thailand|      bloglovin.com|       462|
|  17.98.227.131|Thailand|      cafepress.com|       847|
| 56.170.142.166|Thailand|            mapy.cz|       771|
+---------------+--------+-------------------+----------+



In [114]:
df_filtered = df.filter(df.Domain_Name.like('%yahoo%'))
df_filtered.show()

+---------------+--------------+-----------+----------+
|     ip_address|       Country|Domain_Name|Bytes_used|
+---------------+--------------+-----------+----------+
|145.211.141.165|Czech Republic|  yahoo.com|       917|
|207.246.207.115|      Portugal|yahoo.co.jp|       494|
|   38.121.13.55|      Thailand|yahoo.co.jp|        48|
+---------------+--------------+-----------+----------+



In [115]:
df_filtered = df.filter(df.Domain_Name.endswith('jp'))
df_filtered.show()

+---------------+--------------------+--------------+----------+
|     ip_address|             Country|   Domain_Name|Bytes_used|
+---------------+--------------------+--------------+----------+
|     99.2.6.139|      Czech Republic|  geocities.jp|       322|
| 83.130.135.105|Palestinian Terri...|        i2i.jp|       222|
|   33.93.113.70|            Thailand|     goo.ne.jp|       527|
|  45.253.75.255|           Australia|  hatena.ne.jp|       784|
| 171.83.105.111|               China|   shop-pro.jp|       210|
| 82.101.222.230|              France|     ameblo.jp|       573|
|  58.242.19.151|              Brazil|  amazon.co.jp|       261|
|207.246.207.115|            Portugal|   yahoo.co.jp|       494|
|    3.3.143.240|               China|     ameblo.jp|       624|
|   38.121.13.55|            Thailand|   yahoo.co.jp|        48|
|  92.75.110.241|             Nigeria| rakuten.co.jp|       645|
|177.137.179.144|               China|  google.co.jp|       553|
| 155.191.132.94|        

In [116]:
df_filtered = df.filter(df.Domain_Name.startswith('google'))
df_filtered.show()

+---------------+-------------+-------------+----------+
|     ip_address|      Country|  Domain_Name|Bytes_used|
+---------------+-------------+-------------+----------+
|    4.68.10.108|   Luxembourg|google.com.br|       714|
|200.223.169.213|        China|    google.es|       812|
| 88.128.235.155|      Nigeria|google.com.au|       977|
|242.114.173.239|       Russia|    google.fr|       576|
|  26.241.29.104|    Indonesia|    google.ru|       118|
|  29.135.81.252|        Japan|google.com.hk|       712|
|177.137.179.144|        China| google.co.jp|       553|
|115.125.231.241|         Mali|    google.es|       692|
|  48.219.231.50|       Brazil|    google.nl|       805|
|   207.47.1.181|        Chile| google.co.uk|       530|
| 63.183.213.112|    Indonesia|    google.es|       260|
|  6.139.183.106|    Indonesia| google.co.jp|       428|
|208.240.139.238|      Vietnam|    google.de|       180|
| 24.217.136.187|     Honduras| google.co.uk|       265|
|    64.159.2.86|United States|

In [117]:
df_filtered = df.filter(df.Bytes_used.between(100, 200))
df_filtered.show()

+---------------+-------------+----------------+----------+
|     ip_address|      Country|     Domain_Name|Bytes_used|
+---------------+-------------+----------------+----------+
| 127.242.24.138|        China|surveymonkey.com|       123|
|  9.255.129.184|     Thailand|         mtv.com|       114|
|  190.58.76.137|      Georgia|      forbes.com|       179|
|    80.54.8.201|       Canada|      unicef.org|       190|
|   66.116.15.48|   Uzbekistan|      forbes.com|       181|
| 145.55.140.212|    Macedonia|       hexun.com|       173|
|  121.5.177.135|    Indonesia|       cam.ac.uk|       162|
|  97.86.218.248|       Cyprus|        youtu.be|       140|
| 37.138.177.130|       France|    bandcamp.com|       101|
|  88.180.97.194|     Dominica|  craigslist.org|       161|
|  167.149.73.18|      Ukraine| hugedomains.com|       113|
|228.102.179.239|        China|        fema.gov|       126|
| 79.181.183.168|       France|         bbb.org|       180|
|  26.10.139.188|       Brazil|     harv

In [118]:
df_filtered = df.filter(df.Country.isin('Brazil', 'United States'))
df_filtered.show()

+---------------+-------------+---------------+----------+
|     ip_address|      Country|    Domain_Name|Bytes_used|
+---------------+-------------+---------------+----------+
|   6.151.24.157|       Brazil|        fda.gov|       681|
|  130.18.50.132|       Brazil|      alexa.com|       342|
|155.211.189.235|United States|      globo.com|       967|
|  185.75.79.171|United States|        who.int|       709|
| 169.17.113.145|United States|       ucla.edu|        73|
|  234.78.226.71|       Brazil|        bbb.org|       290|
|  58.242.19.151|       Brazil|   amazon.co.jp|       261|
|244.157.229.246|United States|  discovery.com|       779|
|  220.173.47.44|       Brazil|  wordpress.com|       674|
|  252.28.12.113|       Brazil|  earthlink.net|       681|
| 26.225.202.178|       Brazil|   redcross.org|       898|
|243.114.149.195|       Brazil|      house.gov|        74|
| 254.65.122.193|United States|     utexas.edu|       441|
|  26.10.139.188|       Brazil|    harvard.edu|       19

In [119]:
df_substr = df.select(df.Country, df.Country.substr(-3, 3).alias('last_three_wd'))
df_substr.show()

+--------------+-------------+
|       Country|last_three_wd|
+--------------+-------------+
|         China|          ina|
|         China|          ina|
|         China|          ina|
|      Bulgaria|          ria|
|     Indonesia|          sia|
|      Suriname|          ame|
|       Jamaica|          ica|
|         China|          ina|
|Czech Republic|          lic|
|         China|          ina|
|         Japan|          pan|
|       Belgium|          ium|
|   Afghanistan|          tan|
|     Indonesia|          sia|
|Czech Republic|          lic|
|     Indonesia|          sia|
|        Mexico|          ico|
|       Croatia|          tia|
|      Thailand|          and|
|      Thailand|          and|
+--------------+-------------+
only showing top 20 rows



In [120]:
multi_filtered_df = df.filter((df.Country.isin('Brazil', 'United States')) & (df.Bytes_used > 200))
multi_filtered_df.show()

+---------------+-------------+---------------+----------+
|     ip_address|      Country|    Domain_Name|Bytes_used|
+---------------+-------------+---------------+----------+
|   6.151.24.157|       Brazil|        fda.gov|       681|
|  130.18.50.132|       Brazil|      alexa.com|       342|
|155.211.189.235|United States|      globo.com|       967|
|  185.75.79.171|United States|        who.int|       709|
|  234.78.226.71|       Brazil|        bbb.org|       290|
|  58.242.19.151|       Brazil|   amazon.co.jp|       261|
|244.157.229.246|United States|  discovery.com|       779|
|  220.173.47.44|       Brazil|  wordpress.com|       674|
|  252.28.12.113|       Brazil|  earthlink.net|       681|
| 26.225.202.178|       Brazil|   redcross.org|       898|
| 254.65.122.193|United States|     utexas.edu|       441|
|   122.50.11.94|       Brazil|    comsenz.com|       761|
|176.220.157.207|       Brazil|liveinternet.ru|       706|
| 204.124.186.40|       Brazil|     tripod.com|       97

In [121]:
df.registerTempTable('challenge')
query1 = spark.sql('select * from challenge')
query1.show()

+---------------+--------------+-----------------+----------+
|     ip_address|       Country|      Domain_Name|Bytes_used|
+---------------+--------------+-----------------+----------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|
| 119.239.207.13|         China|         youtu.be|        51|
|  68.69.217.210|         China|        adobe.com|        10|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|
|   211.13.10.68|     Indonesia|          hud.gov|        29|
|   239.80.21.97|      Suriname|       smh.com.au|       218|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|
| 127.242.24.138|         China| surveymonkey.com|       123|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|
|   237.54.11.63|         China|       amazon.com|        83|
| 252.141.157.25|         Japan|      cornell.edu|       374|
|185.220.128.248|       Belgium|       weebly.com|       389|
|   151.77.19.45|   Afghanistan|independent.co.uk|       282|
|  9.161

In [122]:
query2 = spark.sql(
    """
    select 
        CONCAT_WS('-', Country, UPPER(RIGHT(Domain_Name, 2))) as Name,
        Bytes_used
    from 
        challenge
    where 
        Country = "Thailand"
    """
)

query2.show()

+-----------+----------+
|       Name|Bytes_used|
+-----------+----------+
|Thailand-FM|       588|
|Thailand-OM|       114|
|Thailand-JP|       527|
|Thailand-OM|       974|
|Thailand-JP|        48|
|Thailand-OM|       108|
|Thailand-OM|        54|
|Thailand-CA|       755|
|Thailand-OM|       936|
|Thailand-OM|       462|
|Thailand-OM|       847|
|Thailand-CZ|       771|
+-----------+----------+



In [123]:
df_with_calcu_column = df.withColumn('Kilobytes_used', round(df.Bytes_used/1024, 2))
df_with_calcu_column.show()

+---------------+--------------+-----------------+----------+--------------+
|     ip_address|       Country|      Domain_Name|Bytes_used|Kilobytes_used|
+---------------+--------------+-----------------+----------+--------------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|          0.45|
| 119.239.207.13|         China|         youtu.be|        51|          0.05|
|  68.69.217.210|         China|        adobe.com|        10|          0.01|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|          0.83|
|   211.13.10.68|     Indonesia|          hud.gov|        29|          0.03|
|   239.80.21.97|      Suriname|       smh.com.au|       218|          0.21|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|          0.09|
| 127.242.24.138|         China| surveymonkey.com|       123|          0.12|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|          0.31|
|   237.54.11.63|         China|       amazon.com|        83|          0.08|

In [134]:
df_grouped = df.groupBy('Country').agg(sqlfunc.sum('Bytes_used').alias('total'),
                                       round(sqlfunc.avg('Bytes_used'), 2).alias('avg'),
                                       sqlfunc.min('Bytes_used').alias('min'),
                                       sqlfunc.max('Bytes_used').alias('max'))

df_grouped.sort('total', ascending=False).show()

+--------------+-----+------+---+----+
|       Country|total|   avg|min| max|
+--------------+-----+------+---+----+
|         China|83398|484.87| 10| 988|
|     Indonesia|57461|504.04|  4|1000|
|        Russia|32193|574.88|  1| 967|
|   Philippines|30338|466.74| 10| 960|
|        Brazil|19047| 544.2| 74| 979|
|        Poland|16549|533.84|  7| 954|
|        Sweden|15422|550.79|  9| 935|
|      Portugal|13197|573.78| 30| 971|
|Czech Republic|12758| 554.7| 68| 986|
|         Japan|12432|497.28| 40| 991|
|        France|10559|502.81| 49| 968|
|          Peru| 9980|525.26| 10| 921|
|     Argentina| 8574|612.43| 25| 990|
|       Ukraine| 8483|605.93| 44| 941|
|      Colombia| 7215|424.41| 34| 925|
| United States| 7080| 472.0| 38| 967|
|        Canada| 6658|605.27| 61| 963|
|        Mexico| 6293|484.08| 20| 877|
|      Thailand| 6184|515.33| 48| 974|
|       Nigeria| 6097|554.27| 52| 977|
+--------------+-----+------+---+----+
only showing top 20 rows



In [137]:
# df_grouped.write.json('json_result_file.json')
# df_grouped.write.csv('csv_result_file.csv')
# df_grouped.write.parquet('parquet_result_file.parquet')

                                                                                

# Desafio:
### 1. Crie uma nova coluna que identifique se a row é México

In [139]:
df = df.withColumn("Is_Mexico", when(df.Country == 'Mexico', 'yes').otherwise('no'))
df.show()

+---------------+--------------+-----------------+----------+---------+
|     ip_address|       Country|      Domain_Name|Bytes_used|Is_Mexico|
+---------------+--------------+-----------------+----------+---------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|       no|
| 119.239.207.13|         China|         youtu.be|        51|       no|
|  68.69.217.210|         China|        adobe.com|        10|       no|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|       no|
|   211.13.10.68|     Indonesia|          hud.gov|        29|       no|
|   239.80.21.97|      Suriname|       smh.com.au|       218|       no|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|       no|
| 127.242.24.138|         China| surveymonkey.com|       123|       no|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|       no|
|   237.54.11.63|         China|       amazon.com|        83|       no|
| 252.141.157.25|         Japan|      cornell.edu|       374|   

### 2. Group a nova coluna e some os Bytes_used

In [145]:
challeng_result_2 = df.groupBy('Is_Mexico').agg(sqlfunc.sum('Bytes_used').alias('total'))
challeng_result_2.sort('Is_Mexico', ascending=False).show()

+---------+------+
|Is_Mexico| total|
+---------+------+
|      yes|  6293|
|       no|508076|
+---------+------+



### 3. Agrupe pelo Country e use a sqlfunc.countDistinct para calcular o número de IPs distintos em cada país

In [146]:
challeng_result_3 = df.groupBy("Country").agg(sqlfunc.countDistinct('Ip_address').alias('qnt'))
challeng_result_3.sort('qnt', ascending=False).show()



+--------------+---+
|       Country|qnt|
+--------------+---+
|         China|172|
|     Indonesia|114|
|   Philippines| 65|
|        Russia| 56|
|        Brazil| 35|
|        Poland| 31|
|        Sweden| 28|
|         Japan| 25|
|Czech Republic| 23|
|      Portugal| 23|
|        France| 21|
|          Peru| 19|
|      Colombia| 17|
| United States| 15|
|       Ukraine| 14|
|     Argentina| 14|
|        Mexico| 13|
|      Thailand| 12|
|       Nigeria| 11|
|        Canada| 11|
+--------------+---+
only showing top 20 rows



                                                                                