Creating a Spark Session

In [1]:
from pyspark.sql import SparkSession

Starting the spark Session

In [2]:
spark = SparkSession.builder.appName('Operations').getOrCreate()

Loading the viewing the data

In [5]:
# file Path
filePath = 'appl_stock.csv'
df = spark.read.csv(filePath, inferSchema=True, header=True)
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

View the schema of the data

In [6]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



view statistical properties of the numerical data using describe

In [7]:
df.describe().show()

+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|summary|              Open|              High|               Low|            Close|             Volume|         Adj Close|
+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|  count|              1762|              1762|              1762|             1762|               1762|              1762|
|   mean| 313.0763111589103| 315.9112880164581| 309.8282405079457|312.9270656379113|9.422577587968218E7| 75.00174115607275|
| stddev|185.29946803981522|186.89817686485767|183.38391664371008|185.1471036170943|6.020518776592709E7| 28.57492972179906|
|    min|              90.0|         90.699997|         89.470001|        90.279999|           11475900|         24.881912|
|    max|        702.409988|        705.070023|        699.569977|       702.100021|          470249500|127.96609099999999|
+-------

Filtering Data

In [8]:
df.createOrReplaceTempView('stock')

In [11]:
# Get only records that have close less than 500
query1 = 'SELECT * FROM stock where close < 500'
result1 = spark.sql(query1).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [12]:
# Get only records that have close less than 500 and only return the High, low, close and open columns
query2 = 'SELECT high, low, close, open FROM stock where close < 500'
result2 = spark.sql(query2).show()

+------------------+------------------+------------------+------------------+
|              high|               low|             close|              open|
+------------------+------------------+------------------+------------------+
|        214.499996|212.38000099999996|        214.009998|        213.429998|
|        215.589994|        213.249994|        214.379993|        214.599998|
|            215.23|        210.750004|        210.969995|        214.379993|
|        212.000006|        209.050005|            210.58|            211.75|
|        212.000006|209.06000500000002|211.98000499999998|        210.299994|
|        213.000002|        208.450005|210.11000299999998|212.79999700000002|
|209.76999500000002|        206.419998|        207.720001|209.18999499999998|
|210.92999500000002|        204.099998|        210.650002|        207.870005|
|210.45999700000002|        209.020004|            209.43|210.11000299999998|
|211.59999700000003|        205.869999|            205.93|210.92

In [14]:
# Get only records that have close less than 200 and the open is grater than 200
query3 = 'SELECT * FROM stock where close < 200 and open > 200'
result3 = spark.sql(query3).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [15]:
# Get only records that have close less than 200 or the open is grater than 200
query4 = 'SELECT * FROM stock where close < 200 or open > 200'
result4 = spark.sql(query4).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [16]:
# Get only records that have close less than 200 and the open is not grater than 200
query5 = 'SELECT * FROM stock where close < 200 and  not open >  200'
result5 = spark.sql(query5).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-01 00:00:00|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03 00:00:00|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-02-04 00:00:00|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|
|2010-02-05 00:00:00|192.63000300000002|             196.0|        190.850002|        195.460001|212576700|25.3

Using the collect

In [17]:
# Get only records that have close less than 200 and the open is not grater than 200
query5 = 'SELECT * FROM stock where close < 200 and  not open >  200'
result5 = spark.sql(query5).collect()

In [19]:
result5[0]

Row(Date=datetime.datetime(2010, 2, 1, 0, 0), Open=192.36999699999998, High=196.0, Low=191.29999899999999, Close=194.729998, Volume=187469100, Adj Close=25.229131)

In [20]:
testreesult = result5[0]

In [21]:
testreesult.asDict()

{'Date': datetime.datetime(2010, 2, 1, 0, 0),
 'Open': 192.36999699999998,
 'High': 196.0,
 'Low': 191.29999899999999,
 'Close': 194.729998,
 'Volume': 187469100,
 'Adj Close': 25.229131}

In [22]:
for res in result5[0]:
    print(res)

2010-02-01 00:00:00
192.36999699999998
196.0
191.29999899999999
194.729998
187469100
25.229131
