In [10]:
# imported spark variables
spark # the spark session
sc # the spark context

<pyspark.context.SparkContext at 0x7f41c005a990>

## Important urls

- [Spark Web UI](http://localhost:4040/)
- [Spark Master UI](http://localhost:8080/)


In [1]:
from pyspark.sql import Row
from pyspark.sql.types import *

Notice in which directory you are running, since metastore_db will be created there. There can be only one spark applicaton directory because metastore_db will be locked.
The variables spark and sc are automatically imported

In [2]:
!pwd
!ls
print sc.master
spark.createDataFrame([], StructType([])) #force createDataFrame to create metastore_db and derby.log
!ls

/root/scripts/notebooks/n01
example01.ipynb  gen_test_data.py  test_data.txt
spark://2da1f8fedff5:7077
derby.log  example01.ipynb  gen_test_data.py  metastore_db  test_data.txt


## Reference
Check out [Spark SQL Programming Guild](http://spark.apache.org/docs/latest/sql-programming-guide.html)

In [3]:
filename = '/root/scripts/notebooks/n01/test_data.txt'

def parse_line(line):
    line_parts = [part.strip() for part in line.split(',')]
    return Row(key=line_parts[0], value=line_parts[1])

lines_rdd = sc. \
            textFile(filename). \
            map(parse_line)

In [4]:
lines_rdd.take(2)

[Row(key=u'key_8738', value=u'value_298'),
 Row(key=u'key_8958', value=u'value_837')]

In [5]:
def get_schema():
    key_field = StructField('key', StringType(), nullable=False) 
    value_field = StructField('key', StringType(), nullable=False) 
    return StructType([key_field, value_field])

schema=get_schema()
key_values=spark.createDataFrame(lines_rdd)

In [6]:
key_values.show(2)

+--------+---------+
|     key|    value|
+--------+---------+
|key_8738|value_298|
|key_8958|value_837|
+--------+---------+
only showing top 2 rows



In [7]:
# before we save we repartition the data, otherwise no parallelism
key_values.repartition(4).write.parquet('key_values.parquet', mode='overwrite')
del key_values

In [8]:
#notice the 4 partitions
!ls key_values.parquet

_SUCCESS
part-r-00000-c6dbe4c3-0861-4178-b5a9-50777eb27c74.snappy.parquet
part-r-00001-c6dbe4c3-0861-4178-b5a9-50777eb27c74.snappy.parquet
part-r-00002-c6dbe4c3-0861-4178-b5a9-50777eb27c74.snappy.parquet
part-r-00003-c6dbe4c3-0861-4178-b5a9-50777eb27c74.snappy.parquet


In [9]:
key_values = spark.read.parquet('key_values.parquet')
key_values.createOrReplaceTempView('key_values')
top_keys = spark.sql("""
SELECT  key, 
        COUNT(*) AS num_of_values
FROM key_values
GROUP BY key
ORDER BY num_of_values DESC
LIMIT 10
""")
top_keys.show()

+--------+-------------+
|     key|num_of_values|
+--------+-------------+
|key_6770|            7|
|key_3921|            7|
|key_4540|            6|
|key_5040|            6|
|key_7186|            6|
| key_393|            6|
|key_1036|            5|
|key_9636|            5|
|key_7429|            5|
|  key_27|            5|
+--------+-------------+

