### There are cases when you have to parse data before loading into DataFrame
* Data is not of csv format.
* Further parsing of data elements is required
* ...


#### Typical workflow
1. read data as an RDD of text lines.
2. split each line into a list of elements.
3. parse each element (if needed)
4. convert each line into a **_sqlcontext Row_**, i.e. RDD of **_Rows_**
5. convert the RDD into a DataFrame


##### *Row* is a list of key/value pairs represented as
$ [ col_1=value_1, col_2=value_2, ..., col_n = value_n ]$

#### Example: Analyzing Web log data

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("DataFrame Intro") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

In [2]:
from pyspark.sql import Row
log_file = sc.textFile("./data/log_file.txt")

In [3]:
import re

In [4]:
log_file.map(lambda x: re.sub('["\[\]]','',x)).take(1)

['141.243.1.172 29:23:53:25 GET /Software.html HTTP/1.0 200 1497']

In [5]:
def create_schema(row):
    token = row.split(' ')
    ip = token[0]
    date = token[1]
    request = token[2]
    url = token[3].split('?')[0]
    status = token[4]
    size = int(token[5]) if token[5].isnumeric() else 0 
  
    return Row(ip=ip, date=date, request=request, url=url, status=status, size=size)

In [6]:
row_data = log_file.map(lambda x: re.sub('["\[\]]','',x)).map(create_schema)
row_data.take(10)

[Row(date='29:23:53:25', ip='141.243.1.172', request='GET', size=200, status='HTTP/1.0', url='/Software.html'),
 Row(date='29:23:53:36', ip='query2.lycos.cs.cmu.edu', request='GET', size=200, status='HTTP/1.0', url='/Consumer.html'),
 Row(date='29:23:53:53', ip='tanuki.twics.com', request='GET', size=200, status='HTTP/1.0', url='/News.html'),
 Row(date='29:23:54:15', ip='wpbfl2-45.gate.net', request='GET', size=200, status='HTTP/1.0', url='/'),
 Row(date='29:23:54:16', ip='wpbfl2-45.gate.net', request='GET', size=200, status='HTTP/1.0', url='/icons/circle_logo_small.gif'),
 Row(date='29:23:54:18', ip='wpbfl2-45.gate.net', request='GET', size=200, status='HTTP/1.0', url='/logos/small_gopher.gif'),
 Row(date='29:23:54:19', ip='140.112.68.165', request='GET', size=200, status='HTTP/1.0', url='/logos/us-flag.gif'),
 Row(date='29:23:54:19', ip='wpbfl2-45.gate.net', request='GET', size=200, status='HTTP/1.0', url='/logos/small_ftp.gif'),
 Row(date='29:23:54:19', ip='wpbfl2-45.gate.net', requ

In [9]:
schema_DF = spark.createDataFrame(row_data)
schema_DF.createOrReplaceTempView('logs')
sample = spark.sql('SELECT * FROM logs LIMIT 10').show(10)  # returns list


+-----------+--------------------+-------+----+--------+--------------------+
|       date|                  ip|request|size|  status|                 url|
+-----------+--------------------+-------+----+--------+--------------------+
|29:23:53:25|       141.243.1.172|    GET| 200|HTTP/1.0|      /Software.html|
|29:23:53:36|query2.lycos.cs.c...|    GET| 200|HTTP/1.0|      /Consumer.html|
|29:23:53:53|    tanuki.twics.com|    GET| 200|HTTP/1.0|          /News.html|
|29:23:54:15|  wpbfl2-45.gate.net|    GET| 200|HTTP/1.0|                   /|
|29:23:54:16|  wpbfl2-45.gate.net|    GET| 200|HTTP/1.0|/icons/circle_log...|
|29:23:54:18|  wpbfl2-45.gate.net|    GET| 200|HTTP/1.0|/logos/small_goph...|
|29:23:54:19|      140.112.68.165|    GET| 200|HTTP/1.0|  /logos/us-flag.gif|
|29:23:54:19|  wpbfl2-45.gate.net|    GET| 200|HTTP/1.0|/logos/small_ftp.gif|
|29:23:54:19|  wpbfl2-45.gate.net|    GET| 200|HTTP/1.0|     /icons/book.gif|
|29:23:54:19|  wpbfl2-45.gate.net|    GET| 200|HTTP/1.0|  /logos

In [8]:
schema_DF.take(10)

[Row(date='29:23:53:25', ip='141.243.1.172', request='GET', size=200, status='HTTP/1.0', url='/Software.html'),
 Row(date='29:23:53:36', ip='query2.lycos.cs.cmu.edu', request='GET', size=200, status='HTTP/1.0', url='/Consumer.html'),
 Row(date='29:23:53:53', ip='tanuki.twics.com', request='GET', size=200, status='HTTP/1.0', url='/News.html'),
 Row(date='29:23:54:15', ip='wpbfl2-45.gate.net', request='GET', size=200, status='HTTP/1.0', url='/'),
 Row(date='29:23:54:16', ip='wpbfl2-45.gate.net', request='GET', size=200, status='HTTP/1.0', url='/icons/circle_logo_small.gif'),
 Row(date='29:23:54:18', ip='wpbfl2-45.gate.net', request='GET', size=200, status='HTTP/1.0', url='/logos/small_gopher.gif'),
 Row(date='29:23:54:19', ip='140.112.68.165', request='GET', size=200, status='HTTP/1.0', url='/logos/us-flag.gif'),
 Row(date='29:23:54:19', ip='wpbfl2-45.gate.net', request='GET', size=200, status='HTTP/1.0', url='/logos/small_ftp.gif'),
 Row(date='29:23:54:19', ip='wpbfl2-45.gate.net', requ

In [None]:
schema_DF.printSchema()

In [None]:
schema_DF.show()

### Most visited URLs?

In [None]:
schema_DF.groupBy('url').count().orderBy('count', ascending=False).show()


In [None]:
url_access = spark.sql('''SELECT url, count(*) as counts FROM logs GROUP BY url
  ORDER BY counts DESC LIMIT 10''')#.collect()

url_access.show()
#for row in url_access:
#    print row
    


### Most frequent visitors?

In [None]:
schema_DF.groupBy('ip').count().orderBy('count', ascending=False).show()

In [None]:
visitors = spark.sql('''SELECT ip, count(*) as counts FROM logs GROUP BY ip
  ORDER BY counts DESC LIMIT 10''').collect()
for row in visitors:
    print(row)

In [None]:
td = schema_DF.select("ip","size").filter(schema_DF.size > 3000).groupBy('ip').count().orderBy('count', ascending=False)#.show()
td.show()

#### Can we change order of operations?

In [None]:
td = schema_DF.filter(schema_DF.time > 3000).select('ip','time').groupBy('ip').count().orderBy('count', ascending=False)
td.show()

#### How to count visitors by _.com_, _.net_, etc?

In [None]:
tdrdd = td.rdd
tdrdd.take(10)