In [1]:
import findspark
findspark.init('/opt/spark/spark-2.2.0-bin-hadoop2.7/')

In [2]:
from pyspark import SparkContext
sc = SparkContext('local[3]', 'three tasks') 

# we define a list of integers
numbers = [1, 4, 6, 2, 9, 10]

rdd_numbers=sc.parallelize(numbers)
rdd_reduce = rdd_numbers.reduce(lambda x,y : "(" + str(x) +", " + str(y) +")")
print(rdd_reduce)

(((1, 4), (6, 2)), (9, 10))


In [3]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

# SparkSQL with JSON input

In [18]:
# create Spark Data Frame
players = sqlc.read.json("temp.json")

In [19]:
players.printSchema()

root
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- Competition: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Year: long (nullable = true)



In [20]:
players.registerTempTable("T_people")
sqlc.sql("select * from T_people").show(5)

+--------------------+-----------+-----------+-----------+-----------------+---------+------+--------+---------+----+
|                Club|ClubCountry|Competition|DateOfBirth|         FullName|IsCaptain|Number|Position|     Team|Year|
+--------------------+-----------+-----------+-----------+-----------------+---------+------+--------+---------+----+
|Club AtlÃ©tico Ta...|  Argentina|  World Cup|   1905-5-5|     Ãngel Bossio|    false|      |      GK|Argentina|1930|
|Quilmes AtlÃ©tico...|  Argentina|  World Cup| 1908-10-23|     Juan Botasso|    false|      |      GK|Argentina|1930|
|          Boca Junio|  Argentina|  World Cup|  1907-2-23|   Roberto Cherro|    false|      |      FW|Argentina|1930|
|Central Norte TucumÃ|  Argentina|  World Cup|  1907-2-23|Alberto Chividini|    false|      |      DF|Argentina|1930|
|Club Atletico Est...|  Argentina|  World Cup|  1909-3-19|                 |    false|    10|      FW|Argentina|1930|
+--------------------+-----------+-----------+----------

In [21]:
players.show(5)

+--------------------+-----------+-----------+-----------+-----------------+---------+------+--------+---------+----+
|                Club|ClubCountry|Competition|DateOfBirth|         FullName|IsCaptain|Number|Position|     Team|Year|
+--------------------+-----------+-----------+-----------+-----------------+---------+------+--------+---------+----+
|Club AtlÃ©tico Ta...|  Argentina|  World Cup|   1905-5-5|     Ãngel Bossio|    false|      |      GK|Argentina|1930|
|Quilmes AtlÃ©tico...|  Argentina|  World Cup| 1908-10-23|     Juan Botasso|    false|      |      GK|Argentina|1930|
|          Boca Junio|  Argentina|  World Cup|  1907-2-23|   Roberto Cherro|    false|      |      FW|Argentina|1930|
|Central Norte TucumÃ|  Argentina|  World Cup|  1907-2-23|Alberto Chividini|    false|      |      DF|Argentina|1930|
|Club Atletico Est...|  Argentina|  World Cup|  1909-3-19|                 |    false|    10|      FW|Argentina|1930|
+--------------------+-----------+-----------+----------

In [23]:
# Select the player names from 1930 only 
team1930 = sqlc.sql("select distinct FullName from T_people where Year == 1930")
#
# The results of SQL queries are Dataframe objects.
# rdd method returns the content as an :class:`pyspark.RDD` of :class:`Row`.
playerNames = team1930.rdd.map(lambda p: "Name: " + p.FullName).collect()
for name in playerNames:
    print(name)

Name: Ãngel Bossio
Name: Roberto Cherro
Name: Juan Botasso
Name: 
Name: Alberto Chividini
Name: Juan Evaristo


In [25]:
# Spark Data Frame is converted to Pandas Data Frame
players.toPandas().head()

Unnamed: 0,Club,ClubCountry,Competition,DateOfBirth,FullName,IsCaptain,Number,Position,Team,Year
0,Club AtlÃ©tico Talleres de Remedios de Escalada,Argentina,World Cup,1905-5-5,Ãngel Bossio,False,,GK,Argentina,1930
1,Quilmes AtlÃ©tico Club,Argentina,World Cup,1908-10-23,Juan Botasso,False,,GK,Argentina,1930
2,Boca Junio,Argentina,World Cup,1907-2-23,Roberto Cherro,False,,FW,Argentina,1930
3,Central Norte TucumÃ,Argentina,World Cup,1907-2-23,Alberto Chividini,False,,DF,Argentina,1930
4,Club Atletico Estudiantil PorteÃ±o,Argentina,World Cup,1909-3-19,,False,10.0,FW,Argentina,1930


In [27]:
# do step 23 in an alternative way
for b in team1930.collect(): print (b)

Row(FullName='Ãngel Bossio')
Row(FullName='Roberto Cherro')
Row(FullName='Juan Botasso')
Row(FullName='')
Row(FullName='Alberto Chividini')
Row(FullName='Juan Evaristo')


# Next, read from CSV

https://www.supergloo.com/fieldnotes/spark-sql-csv-examples-python/

In [33]:
df = sqlc.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('Uber-Jan-Feb-FOIL.csv')

In [34]:
df.registerTempTable("uber")


In [35]:
df.show(5)

+-----------------------+--------+---------------+-----+----+
|dispatching_base_number|    date|active_vehicles|trips| _c4|
+-----------------------+--------+---------------+-----+----+
|                 B02512|1/1/2015|            190| 1132|null|
|                 B02765|1/1/2015|            225| 1765|null|
|                 B02764|1/1/2015|           3427|29421|null|
|                 B02682|1/1/2015|            945| 7679|null|
|                 B02617|1/1/2015|           1228| 9537|null|
+-----------------------+--------+---------------+-----+----+
only showing top 5 rows



In [36]:
# it might be handy to know the schema
df.printSchema()

root
 |-- dispatching_base_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- active_vehicles: integer (nullable = true)
 |-- trips: integer (nullable = true)
 |-- _c4: string (nullable = true)



In [38]:
# more advanced SQL, such as determining which Uber bases is the busiest based on number of trips
busiestUber = sqlc.sql("""select distinct(`dispatching_base_number`), 
                                sum(`trips`) as cnt from uber group by `dispatching_base_number` 
                                order by cnt desc""")
busiestUber.show()

+-----------------------+-------+
|dispatching_base_number|    cnt|
+-----------------------+-------+
|                 B02764|1914449|
|                 B02617| 725025|
|                 B02682| 662509|
|                 B02598| 540791|
|                 B02765| 193670|
|                 B02512|  93786|
+-----------------------+-------+



In [39]:
# busiest date
busiestDate = sqlc.sql("""select distinct(`date`), 
                                sum(`trips`) as cnt from uber group by `date` 
                                order by cnt desc limit 5""")
busiestDate.show()

+---------+------+
|     date|   cnt|
+---------+------+
|2/20/2015|100915|
|2/14/2015|100345|
|2/21/2015| 98380|
|2/13/2015| 98024|
|1/31/2015| 92257|
+---------+------+



# Spark SQL JDBC

### connecting SQL Server
https://stackoverflow.com/questions/43946157/pyspark-to-read-data-from-sql-server

   query = "(SELECT top 10 * from users) as users"
sqlc = SQLContext(sc)

df = sqlc.read.format("jdbc").options(url="jdbc:sqlserver://mssqlserver:1433;database=user_management;user=pyspark;password=pyspark", dbtable=query).load()

 ### connecting MYSQL
 
    dataframe_mysql = sqlc.read.format("jdbc").option("url", "jdbc:mysql://localhost/uber").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "trips").option("user", "root").option("password", "root").load()

### Use Apache Spark to connect to SQL Server, extract a table from SQL Server, and load the extracted rows into a Hive table:
https://community.hortonworks.com/articles/59205/spark-pyspark-to-extract-from-sql-server.html 

In [None]:
import os
from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext
 
conf = (SparkConf()
  .setAppName("data_import")
  .set("spark.dynamicAllocation.enabled","true")
  .set("spark.shuffle.service.enabled","true"))
sc = SparkContext(conf = conf)
 
sqlctx = HiveContext(sc)
 
df = sqlctx.load(
  source="jdbc", 
  url="jdbc:sqlserver://ec2-54-244-44-6.us-west-2.compute.amazonaws.com:1433;database=sales;user=my_username;password=my_password",
  dbtable="orders")
 
 
## this is how to write to an ORC file
# ORC format: An Intelligent Big Data file format for Hadoop and Hive

df.write.format("orc").save("/tmp/orc_query_output")
 
## this is how to write to a hive table
df.write.mode('overwrite').format('orc').saveAsTable("test_table")

### Read from Hive and write toJDBC Data Source (PostgreSQL table)
http://qiita.com/jlyoung/items/1103ae5f4ca7e05c8e2e 

In [None]:
from pyspark import SparkContext
from pyspark.sql import HiveContext

sc = SparkContext("local", "pySpark Hive JDBC Demo App")
# Create a Hive Context
hive_context = HiveContext(sc)

# Read from the Hive table "crime" on the "default" Hive database. The result is a DataFrame.
print "Reading Hive table..."
crime = hive_context.table("default.crime")

# Register the DataFrame crime as a temporary table crime_temp
print "Registering DataFrame as a table..."
crime.registerTempTable("crime_temp")

# Executing an SQL query over this temporary table to get a list of thefts of property worth less than $500USD.
# The results will be another DataFrame.
print "Executing SQL SELECT query on DataFrame registered as a temporary table..."
pettythefts = hive_context.sql('SELECT * FROM crime_temp WHERE Primary_Type = "THEFT" AND Description = "$500 AND UNDER"')

# Create a new DataFrame containing only the columns we wish to write to the JDBC connected datasource using 
print "Creating a DataFrame of only the columns of our resultset to be persisted to JDBC DataSource..."
pettythefts_table_df = pettythefts.select("id", "case_number", "primary_type", "description", "location_description", "beat", "district", "ward", "community_area")

# Prepare the connection properties for the JDBC datasource.
# table name is public.prettytheft
mode = 'overwrite'
url = 'jdbc:postgresql://<database server IP address>:5432/postgres?searchpath=public'
properties = {"user": "<username>", "password": "<password>", "driver": "org.postgresql.Driver"}
table = 'public.pettytheft'

# Write the contents of the DataFrame to the JDBC datasource (Postgres) using the connection URL defined above.
print "Writing DataFrame to JDBC Datasource..."
pettythefts_table_df.write.jdbc(url=url, table=table, mode=mode, properties=properties)

print "Exiting..."