In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SparkConf

In [2]:
# Config spark setting (optionally)
conf = SparkConf()
conf.set('spark.executor.memory', '1g')
conf.set('spark.core.max', '2')

conf.setAppName('Spark Tutorial')

<pyspark.conf.SparkConf at 0x107393f60>

In [3]:
# Initialize spark context. run only 1 or otherwise you get an error
sc = SparkContext('local', conf)

## Data transformation

* Perform operation on one RDD and create a new RDD.
* Operate on one element at a time.

### Map transformation

`newrdd = rdd.map(function)`

* Works similar to MapReduce `Map`

* Result can be of different type.

* Use cases:
    * Data standardization
    * Element level computation
    * Add new atribute

### flatMap

`newrdd = rdd.flatMap(function)`

* Works the same way as `map`.

* Can return more element than the original map.

* Use to break up elements in the original map and create a new map.

### filter

`newrdd = rdd.filter(function)`
* Select elements in RDD that match a condition

### Set Operations

Set operations are performed on two RDDs.
* Union - Return a new dataset that contains the union of the elements in the source dataset and the argument. `unionrdd = firstrdd.union(secondrdd)`
* Intersection - Return a new dataset that contains the intersection of the elements in the source dataset and the argument. `unionrdd = firstrdd.intersection(secondrdd)`

### Pair RDDs
A special type of RDDs that can store key value pairs.

All transformations for regular RDD are available for Pair RDDs.

Some special operations:
* mapValues - transform each value without changing the key.
* flatMapValues - generate multiple values with the same key.

## Actions

* Act on a RDD and produce a result (not and RDD).
* Lazy evaluation.
* Simple actions:
    * collect - return all elements in the RDD as an array. Use to trigger execution or print values.
    * count - use to count the number of element in the RDD.
    * first - return the first element of the RDD.
    * take(n) - return the first n elements.

### reduce
* Perform an operation across all elements of and RDD:
    * sum, count, etc.
* The operation is a function that take as input two values.
* The function is call for every element in the RDD.
```
if the inputrdd = [a, b, c, d, e] and the function is func(x, y)
=> func( func( func( func(a, b), c), d), e)
```

### aggregate
* Performs parallel computation on partitions and combine them.
* A sequence operation happens on each partition.
* A combine operation help combine the results.
* Can do multiple computations at the same time.
* Take an initial value for each operation - it should be an identity value.
    * rdd = [3, 5, 4, 7, 4]
    * seqOp = (lambda x, y: (x[0] + y, x[1] * y))
    * combOp = (lambda x, y: (x[0] + y[0], x[1] * y[1]))
    * colldata.aggregate((0, 1), seqOp, combOp)
    * if there are 2 partitions: rdd1 = [3, 5, 4], rdd2= [7, 4]. sequence operation will produce [(12, 60), (11, 28)] and combine operation will produce (23, 1680)
    
### Pair RDD actions
* `countByKey` - produces a count by each key in the RDD
* `groupByKey` - perform agregation like sum, average by key
* `reduceByKey` - perform reduce by by key
* `aggregateByKey`


## Loading and Storing data

* RDDs can be create from number of sources:
    * Text file
    * JSON
    * Parallelize() on collections
    * Sequence file
* For optimization, use language specific library for persistent than using Spark library.

## Partitioning

* By default all RDDs are partitioned
    * spark.default.parallelism parameter
    * Default is the total number of core available across the entire cluster.
* Can be specified during RDD creation
* Derived RDD take the same number as the source RDD

## Persistent

* By defalt, Spark loads an RDD whenever it is required, It drops it once the action is over.
    * It will load and re-compute an RDD chain, each time a different operation is performed.
* Persistent allowes intermediate RDD to be persisted so it doesn't need to be re-computed.

## Advance Spark

### Broadcast variables

* A read-only variable that is shared by all nodes
* Used for lookup table or similar functions
* Spark optimizes distribution and storage for better performance

### Accumulators

* A shared variable that can be updated by all nodes
* Help compute items not done through reduce operations
* Spark optimizes distribution and takes care of race condition

### Example

**Loading data from text file**

In [4]:
# Load the file, lazy initialization
autoData = sc.textFile('data/auto-data.csv')
autoData.cache()

data/auto-data.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
autoData.count()

198

In [6]:
autoData.first()

'MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE'

In [7]:
autoData.take(5)

['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118',
 'chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151',
 'mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348']

In [8]:
for line in autoData.collect():
    print(line)

MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118
chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151
mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195
toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,37,41,5389
honda,gas,std,two,hatchback,fwd,four,60,5500,38,42,5399
nissan,gas,std,two,sedan,fwd,four,69,5200,31,37,5499
dodge,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
plymouth,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
mazda,gas,std,two,hatchback,fwd,four,68,5000,31,38,6095
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,31,38,6189
dodge,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
plymouth,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
chevrolet,gas,std,two,hatchback,fwd,four,70,5400,38,43,6295
toyota,gas,std,two,hatchback,fwd,four,62,4800,31,38,6338
dodge,gas,std,two,hatchback,fwd,four,68,5500,31,38,6377

**Loading data from collections**

In [9]:
collData = sc.parallelize([1, 3, 2, 5, 7, 8])
collData.cache()
collData.count()

6

**Data transformation**

In [10]:
# Map and create a new RDD from autoData
tsvData = autoData.map(lambda x: x.replace(',', '\t'))
tsvData.take(5)

['MAKE\tFUELTYPE\tASPIRE\tDOORS\tBODY\tDRIVE\tCYLINDERS\tHP\tRPM\tMPG-CITY\tMPG-HWY\tPRICE',
 'subaru\tgas\tstd\ttwo\thatchback\tfwd\tfour\t69\t4900\t31\t36\t5118',
 'chevrolet\tgas\tstd\ttwo\thatchback\tfwd\tthree\t48\t5100\t47\t53\t5151',
 'mazda\tgas\tstd\ttwo\thatchback\tfwd\tfour\t68\t5000\t30\t31\t5195',
 'toyota\tgas\tstd\ttwo\thatchback\tfwd\tfour\t62\t4800\t35\t39\t5348']

In [11]:
# Filter and create a new RDD
toyotaData = autoData.filter(lambda x: 'toyota' in x)
toyotaData.take(5)

['toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,31,38,6338',
 'toyota,gas,std,four,hatchback,fwd,four,62,4800,31,38,6488',
 'toyota,gas,std,four,wagon,fwd,four,62,4800,31,37,6918',
 'toyota,gas,std,four,sedan,fwd,four,70,4800,30,37,6938']

In [19]:
# map vs flatMap
words = autoData.map(lambda x: x.split(','))
print('output by map:')
print(words.take(5))
print(words.count())

words = autoData.flatMap(lambda x: x.split(','))
print('output by flatMap:')
print(words.take(5))
print(words.count())

output by map:
[['MAKE', 'FUELTYPE', 'ASPIRE', 'DOORS', 'BODY', 'DRIVE', 'CYLINDERS', 'HP', 'RPM', 'MPG-CITY', 'MPG-HWY', 'PRICE'], ['subaru', 'gas', 'std', 'two', 'hatchback', 'fwd', 'four', '69', '4900', '31', '36', '5118'], ['chevrolet', 'gas', 'std', 'two', 'hatchback', 'fwd', 'three', '48', '5100', '47', '53', '5151'], ['mazda', 'gas', 'std', 'two', 'hatchback', 'fwd', 'four', '68', '5000', '30', '31', '5195'], ['toyota', 'gas', 'std', 'two', 'hatchback', 'fwd', 'four', '62', '4800', '35', '39', '5348']]
198
output by flatMap:
['MAKE', 'FUELTYPE', 'ASPIRE', 'DOORS', 'BODY']
2376


In [27]:
# distinct
collData = sc.parallelize([1, 3, 1, 1, 5, 3, 8])
for numdata in collData.collect():
    print(numdata)
    
    
# Can we use list comprehension?
print([n for n in collData.collect()])
print([n for n in collData.distinct().collect()])
# I guess we can

1
3
1
1
5
3
8
[1, 3, 1, 1, 5, 3, 8]
[1, 3, 5, 8]


In [29]:
# Set operations
cities1 = sc.parallelize(['hanoi', 'hcm city', 'vung tau', 'khanh hoa', 'ha long', 'ha giang'])
cities2 = sc.parallelize(['hanoi', 'vieng chan', 'kuala lumpua', 'singapore', 'hcm city', 'vung tau'])

print([c for c in cities1.union(cities2).distinct().collect()])
print([c for c in cities1.intersection(cities2).distinct().collect()])

['hanoi', 'vung tau', 'khanh hoa', 'ha long', 'ha giang', 'vieng chan', 'hcm city', 'kuala lumpua', 'singapore']
['hanoi', 'vung tau', 'hcm city']


**Action**

In [30]:
# reduce
collData.reduce(lambda x, y: x + y)

22

In [31]:
# find the shortest line
autoData.reduce(lambda x, y: x if len(x) < len(y) else y)

'bmw,gas,std,two,sedan,rwd,six,182,5400,16,22,41315'

In [34]:
# aggregation - sum
collData = sc.parallelize([1, 3, 1, 1, 5, 3, 8, 4, 6, 12])
seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)
collData.aggregate((0), seqOp, combOp)

44

In [36]:
# aggregation - sum and product at the same time
seqOp = (lambda x, y: (x[0] + y, x[1] * y))
combOp = (lambda x, y: (x[0] + y[0], x[1] * y[1]))
collData.aggregate((0, 1), seqOp, combOp)

(44, 103680)

**Function**

In [41]:
# clean and transfrom an RDD
def cleanRDD(auto_str):
    if isinstance(auto_str, int):
        return auto_str
    
    att_list = auto_str.split(',')
    
    # convert `doors` to number
    if att_list[3] == 'two':
        att_list[3] = '2'
    elif att_list[3] == 'four':
        att_list[3] = '4'
        
    # convert `drive' to upper case
    att_list[5] = att_list[5].upper()
    
    return ','.join(att_list)

cleaned_data = autoData.map(cleanRDD)
cleaned_data.collect()

['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,2,hatchback,FWD,four,69,4900,31,36,5118',
 'chevrolet,gas,std,2,hatchback,FWD,three,48,5100,47,53,5151',
 'mazda,gas,std,2,hatchback,FWD,four,68,5000,30,31,5195',
 'toyota,gas,std,2,hatchback,FWD,four,62,4800,35,39,5348',
 'mitsubishi,gas,std,2,hatchback,FWD,four,68,5500,37,41,5389',
 'honda,gas,std,2,hatchback,FWD,four,60,5500,38,42,5399',
 'nissan,gas,std,2,sedan,FWD,four,69,5200,31,37,5499',
 'dodge,gas,std,2,hatchback,FWD,four,68,5500,37,41,5572',
 'plymouth,gas,std,2,hatchback,FWD,four,68,5500,37,41,5572',
 'mazda,gas,std,2,hatchback,FWD,four,68,5000,31,38,6095',
 'mitsubishi,gas,std,2,hatchback,FWD,four,68,5500,31,38,6189',
 'dodge,gas,std,4,hatchback,FWD,four,68,5500,31,38,6229',
 'plymouth,gas,std,4,hatchback,FWD,four,68,5500,31,38,6229',
 'chevrolet,gas,std,2,hatchback,FWD,four,70,5400,38,43,6295',
 'toyota,gas,std,2,hatchback,FWD,four,62,4800,31,38,6338',
 'dodge,gas,std,2,hatch

In [42]:
# get average mile per galon for all car
def get_mpg(auto_str):
    if isinstance(auto_str, int):
        return auto_str
    
    att_list = auto_str.split(',')
    if att_list[9].isdigit():
        return int(att_list[9])
    else:
        return 0
    
autoData.reduce(lambda x, y: get_mpg(x) + get_mpg(y)) / (autoData.count() - 1)

25.15228426395939

**Working with pair RDDs**

In [44]:
# Create a pair RDD of auto brand and horse power
cyl_data = autoData.map(lambda x: (x.split(',')[0], x.split(',')[7]))
print(cyl_data.take(5))
cyl_data.keys().collect()

[('MAKE', 'HP'), ('subaru', '69'), ('chevrolet', '48'), ('mazda', '68'), ('toyota', '62')]


['MAKE',
 'subaru',
 'chevrolet',
 'mazda',
 'toyota',
 'mitsubishi',
 'honda',
 'nissan',
 'dodge',
 'plymouth',
 'mazda',
 'mitsubishi',
 'dodge',
 'plymouth',
 'chevrolet',
 'toyota',
 'dodge',
 'honda',
 'toyota',
 'honda',
 'chevrolet',
 'nissan',
 'mitsubishi',
 'dodge',
 'plymouth',
 'mazda',
 'isuzu',
 'mazda',
 'nissan',
 'honda',
 'toyota',
 'toyota',
 'mitsubishi',
 'subaru',
 'nissan',
 'subaru',
 'honda',
 'toyota',
 'honda',
 'honda',
 'nissan',
 'nissan',
 'mazda',
 'subaru',
 'nissan',
 'subaru',
 'dodge',
 'plymouth',
 'mitsubishi',
 'toyota',
 'subaru',
 'volkswagen',
 'toyota',
 'nissan',
 'honda',
 'toyota',
 'toyota',
 'dodge',
 'plymouth',
 'volkswagen',
 'volkswagen',
 'nissan',
 'subaru',
 'toyota',
 'mitsubishi',
 'volkswagen',
 'toyota',
 'nissan',
 'toyota',
 'toyota',
 'mazda',
 'volkswagen',
 'mitsubishi',
 'toyota',
 'honda',
 'mazda',
 'dodge',
 'plymouth',
 'toyota',
 'nissan',
 'honda',
 'subaru',
 'toyota',
 'mitsubishi',
 'mitsubishi',
 'toyota',
 'vo

In [45]:
# remove the header row
header = cyl_data.first()

cyl_hp = cyl_data.filter(lambda x: x != header)
cyl_hp.take(5)

[('subaru', '69'),
 ('chevrolet', '48'),
 ('mazda', '68'),
 ('toyota', '62'),
 ('mitsubishi', '68')]

In [46]:
# add count 1 to each record and then reduce to find the total
brand_values = cyl_hp.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), int(x[1]) + int(y[1])))
brand_values.collect()

[('subaru', (1035, 12)),
 ('chevrolet', (188, 3)),
 ('mazda', (1390, 16)),
 ('toyota', (2969, 32)),
 ('mitsubishi', (1353, 13)),
 ('honda', (1043, 13)),
 ('nissan', (1846, 18)),
 ('dodge', (675, 8)),
 ('plymouth', (607, 7)),
 ('isuzu', (168, 2)),
 ('volkswagen', (973, 12)),
 ('saab', (760, 6)),
 ('peugot', (1098, 11)),
 ('volvo', (1408, 11)),
 ('alfa-romero', (376, 3)),
 ('audi', (687, 6)),
 ('bmw', (1111, 8)),
 ('mercury', ('175', 1)),
 ('porsche', (764, 4)),
 ('mercedes-benz', (1170, 8)),
 ('jaguar', (614, 3))]

In [48]:
# finding the average
brand_values.mapValues(lambda x: int(x[0]) / int(x[1])).collect()

[('subaru', 86.25),
 ('chevrolet', 62.666666666666664),
 ('mazda', 86.875),
 ('toyota', 92.78125),
 ('mitsubishi', 104.07692307692308),
 ('honda', 80.23076923076923),
 ('nissan', 102.55555555555556),
 ('dodge', 84.375),
 ('plymouth', 86.71428571428571),
 ('isuzu', 84.0),
 ('volkswagen', 81.08333333333333),
 ('saab', 126.66666666666667),
 ('peugot', 99.81818181818181),
 ('volvo', 128.0),
 ('alfa-romero', 125.33333333333333),
 ('audi', 114.5),
 ('bmw', 138.875),
 ('mercury', 175.0),
 ('porsche', 191.0),
 ('mercedes-benz', 146.25),
 ('jaguar', 204.66666666666666)]

**Advance Spark**

In [51]:
# Initialize the accumulator
sedan_count = sc.accumulator(0)
hatchback_count = sc.accumulator(0)

# set broadcast variables
sedan_text = sc.broadcast('sedan')
hatchback_text = sc.broadcast('hatchback')

def split_line(line):
    global sedan_count
    global hatchback_count
    
    # use broadcast variables to do comparison and update the aggregators
    if sedan_text.value in line:
        sedan_count += 1
    elif hatchback_text.value in line:
        hatchback_count += 1
    return line.split(',')

# now do the map
split_data = autoData.map(split_line)
print(split_data.count())
print(sedan_count, hatchback_count)



198
92 67


In [57]:
# Partitions
print(collData.getNumPartitions())

# specify the number of patitions
coll_data = sc.parallelize([1, 3, 1, 1, 5, 3, 8, 4, 6, 12], 2)
coll_data.cache()
print(coll_data.count())
print(coll_data.getNumPartitions())

1
10
2


## Spark SQL

### Overview
* A library built on Spark Core that supports SQL like database
* Seamlessly mix SQL query with Spark programs
* Help 'mix' and 'match' RDBMS and noSQL data sources

### Operations supported by dataframe:
* `ilter`
* `join` - join two dataframe base on common column
* `groupBy`
* `agg`
* `registerAsTempTable` - register a dataframe as a table within SQLContext
* Operations can be nested.

### SQLContext
* All functionalities for SparkSQL accessed though a SQLContext
* Derived from SparkContext
* Data Frames are created though SQLContext
* Provides a standard interface to work across different data sources

### Example

In [58]:
# Create a SQLContext form SparkContext
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

**Working with data frames**

In [59]:
# Create a data frame from a JSON file
emp_df = sqlc.read.json('data/customerData.json')
emp_df.show()
emp_df.printSchema()

+---+------+------+-----------------+------+
|age|deptid|gender|             name|salary|
+---+------+------+-----------------+------+
| 32|   100|  male|Benjamin Garrison|  3000|
| 40|   200|  male|    Holland Drake|  4500|
| 26|   100|  male|  Burks Velasquez|  2700|
| 51|   100|female|    June Rutledge|  4300|
| 44|   200|  male|    Nielsen Knapp|  6500|
+---+------+------+-----------------+------+

root
 |-- age: string (nullable = true)
 |-- deptid: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: string (nullable = true)



In [60]:
# Do some sql query
emp_df.select('name').show()


+-----------------+
|             name|
+-----------------+
|Benjamin Garrison|
|    Holland Drake|
|  Burks Velasquez|
|    June Rutledge|
|    Nielsen Knapp|
+-----------------+



In [62]:
emp_df.filter(emp_df['age'] == 40).show()

+---+------+------+-------------+------+
|age|deptid|gender|         name|salary|
+---+------+------+-------------+------+
| 40|   200|  male|Holland Drake|  4500|
+---+------+------+-------------+------+



In [63]:
emp_df.groupBy('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|female|    1|
|  male|    4|
+------+-----+



In [66]:
emp_df.groupBy('deptid').agg({'salary': 'avg', 'age': 'max'}).show()

+------+------------------+--------+
|deptid|       avg(salary)|max(age)|
+------+------------------+--------+
|   200|            5500.0|      44|
|   100|3333.3333333333335|      51|
+------+------------------+--------+



In [68]:
# Create a df from a list
dept_list = [{ "name": "Sales", "id": '100'}, { "name": "Engineering", "id": "200"}]
dept_df = sqlc.createDataFrame(dept_list)
dept_df.show()

+---+-----------+
| id|       name|
+---+-----------+
|100|      Sales|
|200|Engineering|
+---+-----------+



In [69]:
# join the data frames
emp_df.join(dept_df, dept_df.id == emp_df.deptid).show()

+---+------+------+-----------------+------+---+-----------+
|age|deptid|gender|             name|salary| id|       name|
+---+------+------+-----------------+------+---+-----------+
| 51|   100|female|    June Rutledge|  4300|100|      Sales|
| 26|   100|  male|  Burks Velasquez|  2700|100|      Sales|
| 32|   100|  male|Benjamin Garrison|  3000|100|      Sales|
| 44|   200|  male|    Nielsen Knapp|  6500|200|Engineering|
| 40|   200|  male|    Holland Drake|  4500|200|Engineering|
+---+------+------+-----------------+------+---+-----------+



In [70]:
# cascading operations
emp_df.filter(emp_df['age'] >= 30).join(dept_df, dept_df.id == emp_df.deptid).groupBy('deptid').agg({'salary': 'avg', 'age': 'max'}).show()

+------+-----------+--------+
|deptid|avg(salary)|max(age)|
+------+-----------+--------+
|   200|     5500.0|      44|
|   100|     3650.0|      51|
+------+-----------+--------+



In [71]:
# Register a df as a table
emp_df.registerTempTable('employees')
sqlc.sql('select * from employees where salary > 4000').show()

+---+------+------+-------------+------+
|age|deptid|gender|         name|salary|
+---+------+------+-------------+------+
| 40|   200|  male|Holland Drake|  4500|
| 51|   100|female|June Rutledge|  4300|
| 44|   200|  male|Nielsen Knapp|  6500|
+---+------+------+-------------+------+



In [72]:
# To pandas data frame
emp_pdf = emp_df.toPandas()
for index, row in emp_pdf.iterrows():
    print(row['salary'])

3000
4500
2700
4300
6500


**Creating a data frame fom an RDD**

In [73]:
from pyspark.sql import Row
lines = sc.textFile('data/auto-data.csv')

In [74]:
# remove the first line
data_lines = lines.filter(lambda x: 'FUELTYPE' not in x)
data_lines.count()

197

In [76]:
parts = data_lines.map(lambda x: x.split(','))
auto_map = parts.map(lambda x: Row(make = x[0], body = x[4], hp = int(x[7])))
auto_map.collect()

[Row(body='hatchback', hp=69, make='subaru'),
 Row(body='hatchback', hp=48, make='chevrolet'),
 Row(body='hatchback', hp=68, make='mazda'),
 Row(body='hatchback', hp=62, make='toyota'),
 Row(body='hatchback', hp=68, make='mitsubishi'),
 Row(body='hatchback', hp=60, make='honda'),
 Row(body='sedan', hp=69, make='nissan'),
 Row(body='hatchback', hp=68, make='dodge'),
 Row(body='hatchback', hp=68, make='plymouth'),
 Row(body='hatchback', hp=68, make='mazda'),
 Row(body='hatchback', hp=68, make='mitsubishi'),
 Row(body='hatchback', hp=68, make='dodge'),
 Row(body='hatchback', hp=68, make='plymouth'),
 Row(body='hatchback', hp=70, make='chevrolet'),
 Row(body='hatchback', hp=62, make='toyota'),
 Row(body='hatchback', hp=68, make='dodge'),
 Row(body='hatchback', hp=58, make='honda'),
 Row(body='hatchback', hp=62, make='toyota'),
 Row(body='hatchback', hp=76, make='honda'),
 Row(body='sedan', hp=70, make='chevrolet'),
 Row(body='sedan', hp=69, make='nissan'),
 Row(body='hatchback', hp=68, mak

In [79]:
# Infer schema and register the df as a table
auto_df = sqlc.createDataFrame(auto_map)
auto_df.registerTempTable('autos')
sqlc.sql('select * from autos where hp > 100').show()

+---------+---+----------+
|     body| hp|      make|
+---------+---+----------+
|hatchback|102|mitsubishi|
|hatchback|102|     dodge|
|hatchback|102|  plymouth|
|  hardtop|116|    toyota|
|    sedan|116|mitsubishi|
|    sedan|116|mitsubishi|
|    sedan|112|    toyota|
|hatchback|112|    toyota|
|  hardtop|116|    toyota|
|hatchback|116|mitsubishi|
|hatchback|116|    toyota|
|hatchback|101|     mazda|
|  hardtop|116|    toyota|
|    sedan|111|    subaru|
|hatchback|116|    toyota|
|    wagon|111|    subaru|
|hatchback|101|     mazda|
|hatchback|110|      saab|
|    sedan|110|      saab|
|hatchback|145|mitsubishi|
+---------+---+----------+
only showing top 20 rows

