In [12]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('Basics').getOrCreate()

In [13]:
df= spark.read.json("people.json")

In [14]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [15]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [16]:
# to get the column names
df.columns

['age', 'name']

In [17]:
#describe gives the statistic summary of the numeric column
df.describe()

DataFrame[summary: string, age: string, name: string]

In [20]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [22]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [31]:
dataschema= [StructField('age', IntegerType(), True) , StructField('name', StringType(), True)]
# above statement tell to call an class instance to read age as integer type and name as string type
# We enter 'True' to allow it take null values esle will give an error
finalstruct=StructType(fields=dataschema)
df=spark.read.json("people.json", schema=finalstruct)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [32]:
type(df['age'])

pyspark.sql.column.Column

In [33]:
# to see the content of the spark dataframe, use Select statement
df.select('age')

DataFrame[age: int]

In [34]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [35]:
# to see top 2 rows
df.head(2)
# output is list of row objects

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [36]:
# to see only first row
df.head(2)[0]

Row(age=None, name='Michael')

In [37]:
# to see the contents of multiple columns pass in the name of columns as a list 
df.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [38]:
# To add a new column
df.withColumn('New age', df['age']).show()
# pass new name followed by any column whose values you would like to use 

+----+-------+-------+
| age|   name|New age|
+----+-------+-------+
|null|Michael|   null|
|  30|   Andy|     30|
|  19| Justin|     19|
+----+-------+-------+



In [39]:
df.withColumn('Double age', df['age']*2).show()

+----+-------+----------+
| age|   name|Double age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [42]:
# To change the name of existing column
df.withColumnRenamed('age', 'New name').show()

+--------+-------+
|New name|   name|
+--------+-------+
|    null|Michael|
|      30|   Andy|
|      19| Justin|
+--------+-------+



In [49]:
# To create view 
df.createOrReplaceTempView('Myview')
# spacing not allowed while storing view name 
results= spark.sql("SELECT * FROM Myview")
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [51]:
new_result=spark.sql("SELECT * FROM Myview WHERE age= 30")
new_result.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [60]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('ops').getOrCreate()

In [64]:
#inferSchema allows the column to be infered as their respective date type - it automatically gets to knnow if integer or string
#etc.
df1= spark.read.csv('appl_stock.csv', inferSchema=True, header=True)
# header = TRUE treats first coulmn is column headings
df1.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [74]:
df1.head(5)[0]


Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [75]:
df.filter("Close>212").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-19 00:00:00|        208.330002|215.18999900000003|        207.240004|        215.039995|182501900|27.860484999999997|
|2010-03-05 00:00:00|        214.940006|219.69999500000003|214.62999900000003|218.95000499999998|224905100|28.367064000000003|
|2010-03-08 00:00:00|220.01000200000001|        220.090004|        218.250002|        219.079994|107472400|    

In [77]:
df1.filter(df1['Close']>212).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-19 00:00:00|        208.330002|215.18999900000003|        207.240004|        215.039995|182501900|27.860484999999997|
|2010-03-05 00:00:00|        214.940006|219.69999500000003|214.62999900000003|218.95000499999998|224905100|28.367064000000003|
|2010-03-08 00:00:00|220.01000200000001|        220.090004|        218.250002|        219.079994|107472400|    

In [78]:
# to select only few columns based on the filtered data 
df1.filter(df1['Close']>212).select(['High', 'Low']).show()

+------------------+------------------+
|              High|               Low|
+------------------+------------------+
|        214.499996|212.38000099999996|
|        215.589994|        213.249994|
|215.18999900000003|        207.240004|
|219.69999500000003|214.62999900000003|
|        220.090004|        218.250002|
|        224.999996|        217.889994|
|225.48000699999997|223.19999500000003|
|        225.500008|        223.319998|
|227.72999199999998|            225.75|
|        225.500008|        220.249994|
|        224.979996|        222.510006|
|226.44998900000002|223.26999700000002|
|        224.999996|222.60999500000003|
|        225.240002|221.23000299999998|
|        225.999992|        220.150005|
|        228.780003|        224.100002|
|        230.200008|227.50998700000002|
|        230.970013|        226.250011|
|        231.950008|228.55001099999998|
|233.86999900000004|        231.619987|
+------------------+------------------+
only showing top 20 rows



In [79]:
#some errors we might do 
df1.filter(df1['Close']>212 and df1['Close']<230).show()
# as per the error, we cant use ' and ' ' or ' but rather operators and each syntax in brackets

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

In [86]:
df1.filter((df1['Close']>212) & (df1['Close']<230)).select(["Low", "High"]).show()
# the sequence in which column names are mentioned in select statement, it follows them 

+------------------+------------------+
|               Low|              High|
+------------------+------------------+
|212.38000099999996|        214.499996|
|        213.249994|        215.589994|
|        207.240004|215.18999900000003|
|214.62999900000003|219.69999500000003|
|        218.250002|        220.090004|
|        217.889994|        224.999996|
|223.19999500000003|225.48000699999997|
|        223.319998|        225.500008|
|            225.75|227.72999199999998|
|        220.249994|        225.500008|
|        222.510006|        224.979996|
|223.26999700000002|226.44998900000002|
|222.60999500000003|        224.999996|
|221.23000299999998|        225.240002|
|        220.150005|        225.999992|
|        224.100002|        228.780003|
|227.50998700000002|        230.200008|
|        226.250011|        230.970013|
+------------------+------------------+



In [90]:
result1=df1.filter(df1['Low']==218.25002).show()
result1

+----+----+----+---+-----+------+---------+
|Date|Open|High|Low|Close|Volume|Adj Close|
+----+----+----+---+-----+------+---------+
+----+----+----+---+-----+------+---------+



In [93]:
#To get the result as a list (row), use collect() instead of show(). This allows to apply operations on the result later on.
result2=df1.filter(df1['Low']==225.75).collect()
result2

[Row(Date=datetime.datetime(2010, 3, 12, 0, 0), Open=227.37001, High=227.72999199999998, Low=225.75, Close=226.600006, Volume=104080900, Adj Close=29.358195000000002)]

In [94]:
# to see particular columns result 
result2[0](1)

Row(2010-03-12 00:00:00=1)

In [98]:
#Better way is to store as dictionary. While storing as dictionary, need to give index of result for storing
result2[0].asDict()

{'Date': datetime.datetime(2010, 3, 12, 0, 0),
 'Open': 227.37001,
 'High': 227.72999199999998,
 'Low': 225.75,
 'Close': 226.600006,
 'Volume': 104080900,
 'Adj Close': 29.358195000000002}

In [99]:
row=result2[0]

In [102]:
row.asDict()['High']

227.72999199999998