In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('App').getOrCreate()

23/06/06 21:40:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
df = spark.read.json('iris.json')
df.show()

+---------------+-----------+----------+-----------+----------+-------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|species|
+---------------+-----------+----------+-----------+----------+-------+
|              [|       null|      null|       null|      null|   null|
|           null|        1.4|       0.2|        5.1|       3.5| setosa|
|           null|        1.4|       0.2|        4.9|       3.0| setosa|
|           null|        1.3|       0.2|        4.7|       3.2| setosa|
|           null|        1.5|       0.2|        4.6|       3.1| setosa|
|           null|        1.4|       0.2|        5.0|       3.6| setosa|
|           null|        1.7|       0.4|        5.4|       3.9| setosa|
|           null|        1.4|       0.3|        4.6|       3.4| setosa|
|           null|        1.5|       0.2|        5.0|       3.4| setosa|
|           null|        1.4|       0.2|        4.4|       2.9| setosa|
|           null|        1.5|       0.1|        4.9|       3.1| 

In [5]:
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- petalLength: double (nullable = true)
 |-- petalWidth: double (nullable = true)
 |-- sepalLength: double (nullable = true)
 |-- sepalWidth: double (nullable = true)
 |-- species: string (nullable = true)



In [6]:
df.columns

['_corrupt_record',
 'petalLength',
 'petalWidth',
 'sepalLength',
 'sepalWidth',
 'species']

In [7]:
df.describe().show()

23/06/06 21:52:23 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+---------------+------------------+------------------+------------------+-------------------+---------+
|summary|_corrupt_record|       petalLength|        petalWidth|       sepalLength|         sepalWidth|  species|
+-------+---------------+------------------+------------------+------------------+-------------------+---------+
|  count|              2|               150|               150|               150|                150|      150|
|   mean|           null|3.7580000000000027| 1.199333333333334| 5.843333333333335|  3.057333333333334|     null|
| stddev|           null|1.7652982332594662|0.7622376689603467|0.8280661279778637|0.43586628493669793|     null|
|    min|              [|               1.0|               0.1|               4.3|                2.0|   setosa|
|    max|              ]|               6.9|               2.5|               7.9|                4.4|virginica|
+-------+---------------+------------------+------------------+------------------+--------------

In [8]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [9]:
data_schema = [StructField('petalLength', IntegerType(),True),
               StructField('petalWidth', IntegerType(),True),
               StructField('sepalLength', IntegerType(),True),
               StructField('sepalWidth', IntegerType(),True),
               StructField('species', StringType(),True),]

In [10]:
final_struct = StructType(fields = data_schema)

In [11]:
new_df = spark.read.json('iris.json', schema=final_struct)
new_df.printSchema()

root
 |-- petalLength: integer (nullable = true)
 |-- petalWidth: integer (nullable = true)
 |-- sepalLength: integer (nullable = true)
 |-- sepalWidth: integer (nullable = true)
 |-- species: string (nullable = true)



In [12]:
new_df.select('species').show()

+-------+
|species|
+-------+
|   null|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
| setosa|
+-------+
only showing top 20 rows



In [13]:
df.withColumn('doubleSepalLength', df['sepalLength']*2).show()

+---------------+-----------+----------+-----------+----------+-------+-----------------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|species|doubleSepalLength|
+---------------+-----------+----------+-----------+----------+-------+-----------------+
|              [|       null|      null|       null|      null|   null|             null|
|           null|        1.4|       0.2|        5.1|       3.5| setosa|             10.2|
|           null|        1.4|       0.2|        4.9|       3.0| setosa|              9.8|
|           null|        1.3|       0.2|        4.7|       3.2| setosa|              9.4|
|           null|        1.5|       0.2|        4.6|       3.1| setosa|              9.2|
|           null|        1.4|       0.2|        5.0|       3.6| setosa|             10.0|
|           null|        1.7|       0.4|        5.4|       3.9| setosa|             10.8|
|           null|        1.4|       0.3|        4.6|       3.4| setosa|              9.2|
|         

In [14]:
df.createOrReplaceTempView('iris')

sepal_greater_5 = spark.sql("SELECT * from iris WHERE sepalLength > 5")
sepal_greater_5.show()

+---------------+-----------+----------+-----------+----------+-------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|species|
+---------------+-----------+----------+-----------+----------+-------+
|           null|        1.4|       0.2|        5.1|       3.5| setosa|
|           null|        1.7|       0.4|        5.4|       3.9| setosa|
|           null|        1.5|       0.2|        5.4|       3.7| setosa|
|           null|        1.2|       0.2|        5.8|       4.0| setosa|
|           null|        1.5|       0.4|        5.7|       4.4| setosa|
|           null|        1.3|       0.4|        5.4|       3.9| setosa|
|           null|        1.4|       0.3|        5.1|       3.5| setosa|
|           null|        1.7|       0.3|        5.7|       3.8| setosa|
|           null|        1.5|       0.3|        5.1|       3.8| setosa|
|           null|        1.7|       0.2|        5.4|       3.4| setosa|
|           null|        1.5|       0.4|        5.1|       3.7| 

In [22]:
df.filter("species == 'versicolor'" and "petalLength > 5").select(['sepalLength', 'petalLength']).show()

+-----------+-----------+
|sepalLength|petalLength|
+-----------+-----------+
|        6.0|        5.1|
|        6.3|        6.0|
|        5.8|        5.1|
|        7.1|        5.9|
|        6.3|        5.6|
|        6.5|        5.8|
|        7.6|        6.6|
|        7.3|        6.3|
|        6.7|        5.8|
|        7.2|        6.1|
|        6.5|        5.1|
|        6.4|        5.3|
|        6.8|        5.5|
|        5.8|        5.1|
|        6.4|        5.3|
|        6.5|        5.5|
|        7.7|        6.7|
|        7.7|        6.9|
|        6.9|        5.7|
|        7.7|        6.7|
+-----------+-----------+
only showing top 20 rows



In [31]:
df.filter((df['sepalLength'] < 10) & (df['petalLength'] < 10)).show()

+---------------+-----------+----------+-----------+----------+-------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|species|
+---------------+-----------+----------+-----------+----------+-------+
|           null|        1.4|       0.2|        5.1|       3.5| setosa|
|           null|        1.4|       0.2|        4.9|       3.0| setosa|
|           null|        1.3|       0.2|        4.7|       3.2| setosa|
|           null|        1.5|       0.2|        4.6|       3.1| setosa|
|           null|        1.4|       0.2|        5.0|       3.6| setosa|
|           null|        1.7|       0.4|        5.4|       3.9| setosa|
|           null|        1.4|       0.3|        4.6|       3.4| setosa|
|           null|        1.5|       0.2|        5.0|       3.4| setosa|
|           null|        1.4|       0.2|        4.4|       2.9| setosa|
|           null|        1.5|       0.1|        4.9|       3.1| setosa|
|           null|        1.5|       0.2|        5.4|       3.7| 

In [41]:
df.head(3)[2]

Row(_corrupt_record=None, petalLength=1.4, petalWidth=0.2, sepalLength=4.9, sepalWidth=3.0, species='setosa')

In [42]:
results = df.filter(df['sepalLength'] == 4.8).collect()
results

[Row(_corrupt_record=None, petalLength=1.6, petalWidth=0.2, sepalLength=4.8, sepalWidth=3.4, species='setosa'),
 Row(_corrupt_record=None, petalLength=1.4, petalWidth=0.1, sepalLength=4.8, sepalWidth=3.0, species='setosa'),
 Row(_corrupt_record=None, petalLength=1.9, petalWidth=0.2, sepalLength=4.8, sepalWidth=3.4, species='setosa'),
 Row(_corrupt_record=None, petalLength=1.6, petalWidth=0.2, sepalLength=4.8, sepalWidth=3.1, species='setosa'),
 Row(_corrupt_record=None, petalLength=1.4, petalWidth=0.3, sepalLength=4.8, sepalWidth=3.0, species='setosa')]

In [44]:
row = results[3]

In [46]:
row.asDict()['petalLength']

1.6

In [48]:
df.groupBy('species')

<pyspark.sql.group.GroupedData at 0x11f1b3160>

In [51]:
df.groupBy('species').count().show()

+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|      null|    2|
|versicolor|   50|
|    setosa|   50|
+----------+-----+



In [54]:
df.agg({'sepalLength': 'max'}).show()

+----------------+
|max(sepalLength)|
+----------------+
|             7.9|
+----------------+



In [55]:
group_data = df.groupBy('species')

In [56]:
group_data.agg({'sepalLength':'max'}).show()

+----------+----------------+
|   species|max(sepalLength)|
+----------+----------------+
| virginica|             7.9|
|      null|            null|
|versicolor|             7.0|
|    setosa|             5.8|
+----------+----------------+



In [57]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [61]:
df.select(countDistinct('sepalLength').alias('Distinct Sepal Length')).show()

+---------------------+
|Distinct Sepal Length|
+---------------------+
|                   35|
+---------------------+



In [64]:
from pyspark.sql.functions import format_number

avg_sepal = df.select(avg('sepalLength').alias('Average Sepal Length'))
avg_sepal.select(format_number('Average Sepal Length',2).alias('Avg Sepal Length')).show()                      

+----------------+
|Avg Sepal Length|
+----------------+
|            5.84|
+----------------+



In [65]:
df.orderBy('petalLength').show()

+---------------+-----------+----------+-----------+----------+-------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|species|
+---------------+-----------+----------+-----------+----------+-------+
|              [|       null|      null|       null|      null|   null|
|              ]|       null|      null|       null|      null|   null|
|           null|        1.0|       0.2|        4.6|       3.6| setosa|
|           null|        1.1|       0.1|        4.3|       3.0| setosa|
|           null|        1.2|       0.2|        5.8|       4.0| setosa|
|           null|        1.2|       0.2|        5.0|       3.2| setosa|
|           null|        1.3|       0.2|        4.7|       3.2| setosa|
|           null|        1.3|       0.4|        5.4|       3.9| setosa|
|           null|        1.3|       0.2|        4.4|       3.2| setosa|
|           null|        1.3|       0.2|        5.5|       3.5| setosa|
|           null|        1.3|       0.2|        4.4|       3.0| 

In [68]:
df.orderBy(df['petalLength'].desc()).show()

+---------------+-----------+----------+-----------+----------+---------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|  species|
+---------------+-----------+----------+-----------+----------+---------+
|           null|        6.9|       2.3|        7.7|       2.6|virginica|
|           null|        6.7|       2.2|        7.7|       3.8|virginica|
|           null|        6.7|       2.0|        7.7|       2.8|virginica|
|           null|        6.6|       2.1|        7.6|       3.0|virginica|
|           null|        6.4|       2.0|        7.9|       3.8|virginica|
|           null|        6.3|       1.8|        7.3|       2.9|virginica|
|           null|        6.1|       2.5|        7.2|       3.6|virginica|
|           null|        6.1|       1.9|        7.4|       2.8|virginica|
|           null|        6.1|       2.3|        7.7|       3.0|virginica|
|           null|        6.0|       2.5|        6.3|       3.3|virginica|
|           null|        6.0|       1.