Create a DataFrame from an RDD through reflection method

Import findspark and initiate.
Then import pyspark

In [None]:
import findspark
findspark.init('/usr/local/spark')
import pyspark

Start SparkSession

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL example").getOrCreate()

In [None]:
from pyspark.sql import Row

In [None]:
sc = spark.sparkContext

Create an RDD from the structured text file

In [None]:
clines = sc.textFile("customers.txt")

Transform this RDD of text lines to an RDD of "Row"s with each Row having the 5 fields of the text input lines.

In [None]:
cfields = clines.map(lambda l: l.split("\t"))

In [None]:
customers = cfields.map(lambda p: Row(cid=p[0],cname=p[1],ccity=p[2],cstate=p[3],czip=p[4]))

In [None]:
customers.take(3)

Infer the schema, and register the DataFrame as a table (temp view).

In [13]:
customerDF = spark.createDataFrame(customers)

In [14]:
customerDF.printSchema()

root
 |-- ccity: string (nullable = true)
 |-- cid: string (nullable = true)
 |-- cname: string (nullable = true)
 |-- cstate: string (nullable = true)
 |-- czip: string (nullable = true)



In [None]:
customerDF.select("cname").show()

In [None]:
customerDF.select(customerDF['cname'], customerDF['ccity']).show()

In [15]:
customerDF.filter(customerDF['cstate'] == 'CA').show()

+---------------+-----+----------------+------+-----+
|          ccity|  cid|           cname|cstate| czip|
+---------------+-----+----------------+------+-----+
|        Modesto| 5577|      Mary Smith|    CA|95350|
|Rowland Heights| 1745|      Mary Smith|    CA|91748|
|      San Diego|11444|Kathleen Patrick|    CA|92109|
|          Indio| 8846|    Thomas Smith|    CA|92201|
|       El Cajon| 6237|  Bobby Anderson|    CA|92020|
|  Panorama City| 4085|       Mary Carr|    CA|91402|
|       Stockton| 8705|  Patricia Smith|    CA|95207|
| San Bernardino| 3669|       Mary Soto|    CA|92410|
|    Los Angeles| 6101|      Mary Smith|    CA|90033|
|  Laguna Niguel|11697|  Jessica Thomas|    CA|92677|
|       Winnetka| 1295|   Theresa Lopez|    CA|91306|
|    Simi Valley| 4814|     Paul Suarez|    CA|93065|
|       Highland| 8530|   William Smith|    CA|92346|
|        Ontario| 3846|    Ronald Lewis|    CA|91764|
|       Cerritos|10476|     John Hodges|    CA|90703|
|    Los Angeles|10243|  Don

In [16]:
customerDF.groupBy("cstate").count().show()

+------+-----+
|cstate|count|
+------+-----+
|    AZ|   19|
|    SC|    2|
|    LA|    7|
|    MN|    1|
|    NJ|   19|
|    DC|    4|
|    OR|    4|
|    VA|   14|
|    RI|    2|
|    KY|    1|
|    MI|   28|
|    NV|   16|
|    WI|    9|
|    ID|    2|
|    CA|  187|
|    CT|    8|
|    NC|   19|
|    MD|   19|
|    DE|    1|
|    MO|   13|
+------+-----+
only showing top 20 rows



Create the temp view to be able to run SQL queries on the DataFrame

In [17]:
customerDF.createOrReplaceTempView("customers")

In [18]:
cStateCount50 = spark.sql("SELECT cstate, count(*) as sttcount FROM customers GROUP BY cstate HAVING sttcount>=50")

In [19]:
cStateCount50.show()

+------+--------+
|cstate|sttcount|
+------+--------+
|    CA|     187|
|    NY|      79|
|    TX|      62|
|    PR|     505|
+------+--------+



In [None]:
cStateCount50.printSchema()

In [None]:
type(cStateCount50)

In [20]:
sc.stop()