In [1]:
"""
 MySQL injection to Spark, using the Sakila sample database.


"""
import logging
from pyspark.sql import SparkSession

# Creates a session on a local master
spark = SparkSession.builder.appName("MySQL to Dataframe using a JDBC Connection") \
    .master("local[*]").getOrCreate()

user = "root"
password = "password"
use_ssl="false"
mysql_url = "jdbc:mysql://localhost:3306/testschema"
dbtable = "acct"
database="testschema"

df = spark.read.format("jdbc") \
    .options(url=mysql_url,
             database=database,
             dbtable=dbtable,
             user=user,
             password=password) \
    .load()

#df = df.orderBy(df.col("last_name"))

# Displays the dataframe and some of its metadata
df.show(5)
df.printSchema()

logging.info("The dataframe contains {} record(s).".format(df.count()))



+---+-----+
| id| name|
+---+-----+
|  1|Neyaz|
|  2| Nick|
|  3| hari|
|  4| john|
+---+-----+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [2]:
df.show(1)

+---+-----+
| id| name|
+---+-----+
|  1|Neyaz|
+---+-----+
only showing top 1 row



In [3]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [4]:
manual_schema = "'hello', STRINGTYPE, '23', INTEGERTYPE"

In [5]:
print(manual_schema)

'hello', STRINGTYPE, '23', INTEGERTYPE


In [6]:
print(sc)

NameError: name 'sc' is not defined

In [7]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fa7ba356730>


In [8]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
manual_schema_2 = StructType([StructField("Name",StringType(), True), StructField("Age",IntegerType(), True)])

In [9]:

mydata = [("hello",20),("hi",21)]
mydf = spark.createDataFrame(mydata, manual_schema_2)

In [10]:
print(mydata)
mydf.show()

[('hello', 20), ('hi', 21)]
+-----+---+
| Name|Age|
+-----+---+
|hello| 20|
|   hi| 21|
+-----+---+



In [11]:
mydf.printSchema()
print(type(mydata))

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)

<class 'list'>


In [13]:
jsondf = spark.read.json("data/flight-data/json/2015-summary.json")
jsondf.printSchema

<bound method DataFrame.printSchema of DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]>

In [14]:
jsondf.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [23]:
from pyspark.sql.functions import expr, col, column
jsondf.select(expr("DEST_COUNTRY_NAME as DEST")).show()


+--------------------+
|                DEST|
+--------------------+
|       United States|
|       United States|
|       United States|
|               Egypt|
|       United States|
|       United States|
|       United States|
|          Costa Rica|
|             Senegal|
|             Moldova|
|       United States|
|       United States|
|              Guyana|
|               Malta|
|            Anguilla|
|             Bolivia|
|       United States|
|             Algeria|
|Turks and Caicos ...|
|       United States|
+--------------------+
only showing top 20 rows

+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|       United States|
|       United States|
|       United States|
|               Egypt|
|       United States|
|       United States|
|       United States|
|          Costa Rica|
|             Senegal|
|             Moldova|
|       United States|
|       United States|
|              Guyana|
|               Malta|
|            Anguilla|
|       

In [25]:
jsondf.selectExpr("DEST_COUNTRY_NAME as DEST").show()

+--------------------+
|                DEST|
+--------------------+
|       United States|
|       United States|
|       United States|
|               Egypt|
|       United States|
|       United States|
|       United States|
|          Costa Rica|
|             Senegal|
|             Moldova|
|       United States|
|       United States|
|              Guyana|
|               Malta|
|            Anguilla|
|             Bolivia|
|       United States|
|             Algeria|
|Turks and Caicos ...|
|       United States|
+--------------------+
only showing top 20 rows



In [26]:
jsondf.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']