## Analytics con DataFrames sobre datos semi estructurados

In [0]:
import urllib

with urllib.request.urlopen('https://github.com/juanpampliega/datasets/raw/master/http_access_200304.log.gz') as response:
  gzipcontent = response.read()

with open("/tmp/test_http_access_log.gz", 'wb') as f:
  f.write(gzipcontent)

dbutils.fs.cp("file:/tmp/test_http_access_log.gz", "/tmp/")

In [0]:
access_logs = sc.textFile("dbfs:/tmp/test_http_access_log.gz")
access_logs.take(5)

In [0]:
import re
import json

def parse_access_log_line(line):
  
  format_pat= re.compile( 
      r"(?P<host>[\d\.]+)\s" 
      r"(?P<identity>\S*)\s" 
      r"(?P<user>\S*)\s"
      r"\[(?P<time>.*?)\]\s"
      r'"(?P<request>.*?)"\s'
      r"(?P<status>\d+)\s"
      r"(?P<bytes>\S*)\s"
      r'"(?P<referer>.*?)"\s' # [SIC]
      r'"(?P<user_agent>.*?)"\s*' 
  )

  match = format_pat.match(line)
  return json.dumps(match.groupdict())


In [0]:
access_logs = sc.textFile("dbfs:/tmp/test_http_access_log.gz")
access_logs_json = access_logs.map(parse_access_log_line)

access_logs_json.take(5)


In [0]:
logs = spark.read.json(access_logs_json)

logs.printSchema()
logs.show()

logs.createOrReplaceTempView("logs")

In [0]:
%sql
SELECT
  time,
  unix_timestamp(substring(time,0,11), 'dd/MMM/yyyy'),
  from_unixtime(unix_timestamp(substring(time,0,11), 'dd/MMM/yyyy'),"yyyy-MM-dd")
FROM logs
limit 7

time,"unix_timestamp(substring(time, 0, 11), dd/MMM/yyyy)","from_unixtime(unix_timestamp(substring(time, 0, 11), dd/MMM/yyyy), yyyy-MM-dd)"
05/Apr/2003:00:17:25 -0800,1049500800,2003-04-05
06/Apr/2003:13:24:39 -0700,1049587200,2003-04-06
09/Apr/2003:20:51:30 -0700,1049846400,2003-04-09
09/Apr/2003:23:45:49 -0700,1049846400,2003-04-09
10/Apr/2003:00:00:00 -0700,1049932800,2003-04-10
10/Apr/2003:00:00:05 -0700,1049932800,2003-04-10
10/Apr/2003:00:00:07 -0700,1049932800,2003-04-10


In [0]:
sql = sqlContext.sql("""
SELECT 
  from_unixtime(unix_timestamp(substring(time,0,11), 'dd/MMM/yyyy'),"yyyy-MM-dd") as day,
  status,
  count(1) as mount
FROM logs
GROUP BY from_unixtime(unix_timestamp(substring(time,0,11), 'dd/MMM/yyyy'),"yyyy-MM-dd"), status
ORDER BY 1, 2
""")
sql.show()

display(sql)

day,status,mount
2003-04-05,200,1
2003-04-06,200,1
2003-04-09,200,2
2003-04-10,200,3613
2003-04-10,206,21
2003-04-10,301,11
2003-04-10,302,135
2003-04-10,304,473
2003-04-10,401,1
2003-04-10,403,21


In [0]:
print(logs)
statuses = logs.groupBy("status").count().orderBy("count")
display(statuses)


status,count
400,1
405,4
401,17
301,385
206,1518
403,1913
404,4202
304,14030
302,16471
200,83331


In [0]:
files = ['ipligence-lite.csv', 'http_access_200304.log.gz', 'http_access_200306.log.gz', 'http_access_200307.log.gz']

In [0]:
def download_file(file):
  with urllib.request.urlopen('https://github.com/juanpampliega/datasets/raw/master/{f}'.format(f=file)) as response:
    gzipcontent = response.read()

  with open('/dbfs/tmp/{f}'.format(f=file), 'wb') as f:
    f.write(gzipcontent)

In [0]:
for f in files:
  download_file(f)

In [0]:
logs2 = sc.textFile("dbfs:/tmp/http_access_2003*")
records = logs2.map(parse_access_log_line)

records.take(2)

In [0]:
all_logs = spark.read.json(records)

all_logs.printSchema()

all_logs.createOrReplaceTempView("all_logs")

In [0]:
geo_ip = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('dbfs:/tmp/ipligence-lite.csv')

geo_ip.createOrReplaceTempView("geo_ips")

display(geo_ip)

from_ip,to_ip,country_iso,country,region,region_name
0,16777215,,,,
16777216,16777471,AU,AUSTRALIA,OC,OCEANIA
16777472,16778239,CN,CHINA,AS,ASIA
16778240,16779263,AU,AUSTRALIA,OC,OCEANIA
16779264,16781311,CN,CHINA,AS,ASIA
16781312,16785407,JP,JAPAN,AS,ASIA
16785408,16793599,CN,CHINA,AS,ASIA
16793600,16809983,JP,JAPAN,AS,ASIA
16809984,16842751,TH,THAILAND,AS,ASIA
16842752,16843007,CN,CHINA,AS,ASIA


In [0]:
%scala
def ipToNumber(ipAddr: String): Long = {
  try {
    val parts = ipAddr.split("\\.")
    parts(3).toLong + (parts(2).toLong * 256L) + (parts(1).toLong * 256L * 256L) + (parts(0).toLong * 256L * 256L * 256L)
  } catch {
    case e: Exception => {
      e.printStackTrace
      0
    }
  }
}

sqlContext.udf.register("INET_ATON", (ip:String) => ipToNumber(ip))

In [0]:
%sql CACHE TABLE tbl_ip_country AS
SELECT all_logs.host, geo_ips.country_iso
FROM all_logs INNER JOIN geo_ips
    ON 
    geo_ips.from_ip <= INET_ATON(all_logs.host) AND 
    geo_ips.to_ip >= INET_ATON(all_logs.host)
--LIMIT 100

host,country_iso
129.118.191.108,US
62.31.206.207,GB
205.188.209.43,US
24.186.80.132,US
69.10.137.199,CA
63.200.51.4,US
207.175.242.183,US
65.214.36.115,US
208.61.124.145,US
145.145.6.14,NL


In [0]:
%sql 
SELECT country_iso, COUNT(1) AS count 
FROM tbl_ip_country
GROUP BY country_iso
ORDER BY count DESC
LIMIT 8

country_iso,count
US,58
NL,11
CA,10
SE,10
GB,2
AU,2
IT,1
SA,1


In [0]:
# https://en.wikipedia.org/wiki/ISO_3166-1_alpha-3

In [0]:
%sql 
SELECT 
 CASE WHEN country_iso = 'US' THEN 'USA'
      WHEN country_iso = 'NL' THEN 'NLD'
      WHEN country_iso = 'CA' THEN 'CAN'
      WHEN country_iso = 'SE' THEN 'SRB'
      WHEN country_iso = 'GB' THEN 'GBR'
      WHEN country_iso = 'AU' THEN 'AUS'
      WHEN country_iso = 'IT' THEN 'ITA'
      WHEN country_iso = 'SA' THEN 'ZAF'
      ELSE NULL END as country_iso, 
COUNT(1) AS count 
FROM tbl_ip_country
GROUP BY country_iso
ORDER BY count DESC
LIMIT 8

country_iso,count
USA,58
NLD,11
CAN,10
SRB,10
GBR,2
AUS,2
ITA,1
ZAF,1
