The aim of this notebook is to express the semantics of schema evolution of parquet files using both Spark and parquet2hive + Presto. Note that all parquet2hive is doing is reading the schema *from the most recently created file*, so in some cases this could be changed without changing Presto's underlying facilities by reading *all* files.

In this notebook I am running Spark locally, and using a remote Presto cluster. To connect to this cluster I'm using parquet2hive_server [0], which is just a simple API for parquet2hive on the remote cluster. To run this notebook successfully, you'll need to run the following on the Presto cluster:

```
sudo pip install parquet2hive_server
start_parquet2hive_server
```

[0] http://www.github.com/fbertsch/parquet2hive_server

In [None]:
from parquet2hive_server.client import Parquet2HiveClient
from pyhive import presto
from pprint import pprint

presto_dns = 'ec2-54-149-100-125.us-west-2.compute.amazonaws.com'

client = Parquet2HiveClient(presto_dns + ':5129')

In [None]:
import boto3

bucket, prefix = "telemetry-test-bucket", "schema_evolution"
dataset = "s3://{}/{}/".format(bucket, prefix)
partition = '/type='

s3 = boto3.resource('s3')
objects_to_delete = s3.meta.client.list_objects(Bucket=bucket, Prefix=prefix)

delete_keys = {}
delete_keys['Objects'] = [{'Key' : k} for k in [obj['Key'] for obj in objects_to_delete.get('Contents', [])]]

try:
    _ = s3.meta.client.delete_objects(Bucket=bucket, Delete=delete_keys)
except Exception:
    pass

In [None]:
conn = presto.connect(host=presto_dns, port='8889')
cursor = conn.cursor()

def get_schema(_cursor, _v):
    """Prints the schema in a similar format to spark's dataframe.printSchema()"""
    _cursor.execute('describe schema_evolution_{}'.format(_v))
    return '\\\n'.join(['root'] + [' |-- {}: {}'.format(a, b) for a, b, _, _ in _cursor.fetchall()])

def execute(_cursor, _query):
    _cursor.execute(_query)
    results = _cursor.fetchall()
    colnames = [c[0] for c in _cursor.description]
    return '\\\n'.join([', '.join(['{}={}'.format(c,r) for c,r in zip(colnames, res)]) for res in results])

## Adding a Column

In [None]:
v = 'v1'

In [None]:
rdd = sc.parallelize([(0,),(1,)], 1)
df = sqlContext.createDataFrame(rdd, ['id'])
df.write.parquet(dataset + v + partition + '1')

In [None]:
rdd = sc.parallelize([(0,0),(1,1)], 1)
df = sqlContext.createDataFrame(rdd, ['id', 'score'])
df.write.parquet(dataset + v + partition  + '2')

### Spark

In [None]:
sqlContext.read.load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.load(dataset + v, 'parquet').collect()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').collect()

### Presto

In [None]:
client.load(dataset=dataset, dv=v)

In [None]:
print get_schema(cursor, v)

In [None]:
print execute(cursor, "SELECT * FROM schema_evolution_{}".format(v))

## Removing a Column

In [None]:
v = 'v2'

rdd = sc.parallelize([(0,0),(1,1)], 1)
df = sqlContext.createDataFrame(rdd, ['id', 'score'])
df.write.parquet(dataset + v + partition  + '1')

In [None]:
rdd = sc.parallelize([(0,),(1,)], 1)
df = sqlContext.createDataFrame(rdd, ['id'])
df.write.parquet(dataset + v + partition + '2')

### Spark

In [None]:
sqlContext.read.load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.load(dataset + v, 'parquet').collect()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').collect()

### Presto

In [None]:
client.load(dataset=dataset, dv=v)

In [None]:
print get_schema(cursor, v)

In [None]:
print execute(cursor, "SELECT * FROM schema_evolution_{}".format(v))

## Renaming a Column

In [None]:
v = 'v3'

In [None]:
rdd = sc.parallelize([(0,),(1,)], 1)
df = sqlContext.createDataFrame(rdd, ['id'])
df.write.parquet(dataset + v + partition + '1')

In [None]:
rdd = sc.parallelize([(0,),(1,)], 1)
df = sqlContext.createDataFrame(rdd, ['score'])
df.write.parquet(dataset + v + partition + '2')

### Spark

In [None]:
sqlContext.read.load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.load(dataset + v, 'parquet').collect()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').collect()

### Presto

In [None]:
client.load(dataset=dataset, dv=v)

In [None]:
print get_schema(cursor, v)

In [None]:
print execute(cursor, "SELECT * FROM schema_evolution_{}".format(v))

### Replace Column

Note that this is similar to "rename column", but the new data has a different type.

In [None]:
v = 'v4'

rdd = sc.parallelize([(0,),(1,)], 1)
df = sqlContext.createDataFrame(rdd, ['id'])
df.write.parquet(dataset + v + partition + '1')

In [None]:
rdd = sc.parallelize([('a',),('b',)], 1)
df = sqlContext.createDataFrame(rdd, ['score'])
df.write.parquet(dataset + v + partition + '2')

### Spark

In [None]:
sqlContext.read.load(dataset + v, 'parquet').printSchema()

In [None]:
df = sqlContext.read.load(dataset + v, 'parquet').collect()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').collect()

### Presto

In [None]:
client.load(dataset=dataset, dv=v)

In [None]:
print get_schema(cursor, v)

In [None]:
print execute(cursor, "SELECT * FROM schema_evolution_{}".format(v))

## Transpose Columns

In [None]:
v = 'v5'

rdd = sc.parallelize([(0,'a','b')], 1)
df = sqlContext.createDataFrame(rdd, ['id', 'transpose_a', 'transpose_b'])
df.write.parquet(dataset + v + partition + '1')

In [None]:
rdd = sc.parallelize([(1,'b','a')], 1)
df = sqlContext.createDataFrame(rdd, ['id', 'transpose_b', 'transpose_a'])
df.write.parquet(dataset + v + partition + '2')

### Spark

In [None]:
sqlContext.read.load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.load(dataset + v, 'parquet').select('transpose_a','transpose_b').collect()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').select('transpose_a','transpose_b').collect()

### Presto

In [None]:
client.load(dataset=dataset, dv=v)

In [None]:
print(get_schema(cursor, v))

In [None]:
print execute(cursor, "SELECT * FROM schema_evolution_{}".format(v))

## Transpose, Delete and Add Columns

In [None]:
v = 'v6'

rdd = sc.parallelize([(0,'r','t')], 1)
df = sqlContext.createDataFrame(rdd, ['id', 'removed', 'transposed'])
df.write.parquet(dataset + v + partition + '1')

In [None]:
rdd = sc.parallelize([(1,'t','a')], 1)
df = sqlContext.createDataFrame(rdd, ['id', 'transposed', 'added'])
df.write.parquet(dataset + v + partition + '2')

### Spark

In [None]:
sqlContext.read.load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.load(dataset + v, 'parquet').collect()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').collect()

### Presto

In [None]:
client.load(dataset=dataset, dv=v)

In [None]:
print(get_schema(cursor, v))

In [None]:
print execute(cursor, "SELECT * FROM schema_evolution_{}".format(v))

# Nested Row Type - Adding a Subcolumn

In [None]:
from pyspark.sql.types import *

v = 'v7'

df = sqlContext.createDataFrame([[[1, 'e']]], StructType([
    StructField("nested", 
                StructType([
                        StructField("id", LongType()), 
                        StructField("exists", StringType())
                    ])
               )
]))

df.write.parquet(dataset + v + partition + '1')

In [None]:
df = sqlContext.createDataFrame([[[1, 'e', 'a']]], StructType([
    StructField("nested", 
                StructType([
                        StructField("id", LongType()), 
                        StructField("exists", StringType()),
                        StructField("added", StringType())
                    ])
               )
]))
df.write.parquet(dataset + v + partition + '2')

## Spark

In [None]:
sqlContext.read.load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.load(dataset + v, 'parquet').collect()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').collect()

## Presto

In [None]:
client.load(dataset=dataset, dv=v)

In [None]:
print(get_schema(cursor, v))

In [None]:
print execute(cursor, "SELECT * FROM schema_evolution_{}".format(v))

# Nested Row Type - Removing a Subcolumn

In [None]:
v = 'v8'

df = sqlContext.createDataFrame([[[1, 'e', 'r']]], StructType([
    StructField("nested", 
                StructType([
                        StructField("id", LongType()), 
                        StructField("exists", StringType()),
                        StructField("removed", StringType())
                    ])
               )
]))

df.write.parquet(dataset + v + partition + '1')

In [None]:
df = sqlContext.createDataFrame([[[1, 'e']]], StructType([
    StructField("nested", 
                StructType([
                        StructField("id", LongType()), 
                        StructField("exists", StringType())
                    ])
               )
]))

df.write.parquet(dataset + v + partition + '2')

## Spark

In [None]:
sqlContext.read.load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.load(dataset + v, 'parquet').collect()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').printSchema()

In [None]:
sqlContext.read.option("mergeSchema", "true").load(dataset + v, 'parquet').collect()

## Presto

In [None]:
client.load(dataset=dataset, dv=v)

In [None]:
print(get_schema(cursor, v))

In [None]:
print execute(cursor, "SELECT * FROM schema_evolution_{}".format(v))