<a href="https://colab.research.google.com/github/Savvy-Slowley/Big-Data-Cloud/blob/main/Spark_Basic_Syntax.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 60 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 70.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=916d687578ab201f4a706914ae85b4abf90560dc6d2918a978a1ed99e08d612c
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


# Basics

In [None]:
from pyspark.sql import SparkSession

In [None]:
# May take awhile locally
# Creating a spark session
spark = SparkSession.builder.appName("Operations").getOrCreate()

In [None]:
spark

In [None]:
df_pysprak = spark.read.csv('/content/ContainsNull.csv')

In [None]:
df_pysprak

DataFrame[_c0: string, _c1: string, _c2: string]

In [None]:
df_pysprak.show()

+----+-----+-----+
| _c0|  _c1|  _c2|
+----+-----+-----+
|  Id| Name|Sales|
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [None]:
df_pysprak = spark.read.csv("/content/ContainsNull.csv",inferSchema=True,header=True) # spark.read.option("header",True).csv("/content/ContainsNull.csv",inferSchema=True)

In [None]:
df_pysprak

DataFrame[Id: string, Name: string, Sales: double]

In [None]:
df_pysprak.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [None]:
type(df_pysprak)

pyspark.sql.dataframe.DataFrame

In [None]:
df_pysprak.head(3)

[Row(Id='emp1', Name='John', Sales=None),
 Row(Id='emp2', Name=None, Sales=None),
 Row(Id='emp3', Name=None, Sales=345.0)]

In [None]:
# Like df.info
df_pysprak.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [None]:
df_pysprak.columns

['Id', 'Name', 'Sales']

In [None]:
df_pysprak.select(['Name','Sales']).show()

+-----+-----+
| Name|Sales|
+-----+-----+
| John| null|
| null| null|
| null|345.0|
|Cindy|456.0|
+-----+-----+



In [None]:
df_pysprak.dtypes

[('Id', 'string'), ('Name', 'string'), ('Sales', 'double')]

In [None]:
df_pysprak.describe().show()

+-------+----+-----+-----------------+
|summary|  Id| Name|            Sales|
+-------+----+-----+-----------------+
|  count|   4|    2|                2|
|   mean|null| null|            400.5|
| stddev|null| null|78.48885271170677|
|    min|emp1|Cindy|            345.0|
|    max|emp4| John|            456.0|
+-------+----+-----+-----------------+



In [None]:
# Add columns in data frame
df_pysprak = df_pysprak.withColumn('Sales minus 100', df_pysprak['Sales']-100)
df_pysprak.show()

+----+-----+-----+---------------+
|  Id| Name|Sales|Sales minus 100|
+----+-----+-----+---------------+
|emp1| John| null|           null|
|emp2| null| null|           null|
|emp3| null|345.0|          245.0|
|emp4|Cindy|456.0|          356.0|
+----+-----+-----+---------------+



In [None]:
# Drop columns
df_pysprak = df_pysprak.drop('Sales minus 100')
df_pysprak.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [None]:
# rename column
df_pysprak = df_pysprak.withColumnRenamed('Id', 'Employee ID')
df_pysprak.show()

+-----------+-----+-----+
|Employee ID| Name|Sales|
+-----------+-----+-----+
|       emp1| John| null|
|       emp2| null| null|
|       emp3| null|345.0|
|       emp4|Cindy|456.0|
+-----------+-----+-----+



In [None]:
# drop na column
df_pysprak_nona = df_pysprak.na.drop()
df_pysprak_nona.show()

+-----------+-----+-----+
|Employee ID| Name|Sales|
+-----------+-----+-----+
|       emp4|Cindy|456.0|
+-----------+-----+-----+



In [None]:
# fill Name column
df_pysprak = df_pysprak.na.fill('Peter', 'Name')
df_pysprak.show()

+-----------+-----+-----+
|Employee ID| Name|Sales|
+-----------+-----+-----+
|       emp1| John| null|
|       emp2|Peter| null|
|       emp3|Peter|345.0|
|       emp4|Cindy|456.0|
+-----------+-----+-----+



In [None]:
df_pysprak.filter("Sales > 350").show()

+-----------+-----+-----+
|Employee ID| Name|Sales|
+-----------+-----+-----+
|       emp4|Cindy|456.0|
+-----------+-----+-----+



In [None]:
df_pysprak.filter("Name == 'Peter'").select(["Name", "Sales"]).show()

+-----+-----+
| Name|Sales|
+-----+-----+
|Peter| null|
|Peter|345.0|
+-----+-----+



In [None]:
df_pysprak.filter(~(df_pysprak['Name'] == 'Peter') & (df_pysprak['Sales'] > 300)).select(["Name", "Sales"]).show()

+-----+-----+
| Name|Sales|
+-----+-----+
|Cindy|456.0|
+-----+-----+



In [None]:
df_pysprak.groupBy('Name').avg().show()

+-----+----------+
| Name|avg(Sales)|
+-----+----------+
|Cindy|     456.0|
| John|      null|
|Peter|     345.0|
+-----+----------+



In [None]:
df_pysprak.groupBy('Name').count().show()

+-----+-----+
| Name|count|
+-----+-----+
|Cindy|    1|
| John|    1|
|Peter|    2|
+-----+-----+



In [None]:
df = spark.read.json('people.json')

In [None]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [None]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType

In [None]:
data_schema = [StructField("age", IntegerType(), True),StructField("name", StringType(), True)]

In [None]:
final_struc = StructType(fields=data_schema)

In [None]:
df = spark.read.json('people.json', schema=final_struc)

In [None]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [None]:
df.head(3)[0]

Row(age=None, name='Michael')

In [None]:
# SQL Temporary view
df.createOrReplaceTempView('people')

In [None]:
results = spark.sql("SELECT * FROM people")

In [None]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [None]:
spark.sql("SELECT * FROM people WHERE age=30").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [None]:
result = spark.sql("SELECT * FROM people WHERE age=30").collect()

In [None]:
type(result[0])

pyspark.sql.types.Row

In [None]:
result[0]

Row(age=30, name='Andy')

In [None]:
row = result[0]

In [None]:
row.asDict()

{'age': 30, 'name': 'Andy'}

In [None]:
row.asDict()['name']

'Andy'

In [None]:
for item in result[0]:
    print(item)

30
Andy


In [None]:
df.agg({'age':'min'}).show()

+--------+
|min(age)|
+--------+
|      19|
+--------+



In [None]:
from pyspark.sql.functions import countDistinct

In [None]:
df.select(countDistinct('age').alias('COUNT')).show()

+-----+
|COUNT|
+-----+
|    2|
+-----+

