<a href="https://colab.research.google.com/github/Sam-Ny/PySpark/blob/main/Pyspark_basics_csv.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=579ce016eba1bc7c1655a58562c458eca71488e1464d91777d6ff81a6f83b941
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


# To load and analyse the fakefriends.csv data | Data Frame reader and writer.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as func

In [3]:
#Creating the SparkSession
spark_fakefriends = SparkSession.builder.appName("FirstApp").getOrCreate()

In [4]:
#Defining schema for your Dataframe
myschema = StructType([\
                       StructField("userID", IntegerType(),True), #True means is nullable
                       StructField("name", StringType(),True),
                       StructField("age", IntegerType(),True),
                       StructField("friends", IntegerType(),True)
                       ])

In [5]:
#Creating Dataframe on a CSV file
people = spark_fakefriends.read.format("CSV")\
      .schema(myschema)\
      .option("path","/content/fakefriends.csv")\
      .load()

people.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)



In [6]:
#Performing all the transformations
output=people.select(people.userID,people.name,people.age,people.friends).where(people.age<30).withColumn('insert_timestamp',func.current_timestamp()).orderBy(people.userID).cache()

In [7]:
#taking the count of o/p dataframe
output.show()

+------+--------+---+-------+--------------------+
|userID|    name|age|friends|    insert_timestamp|
+------+--------+---+-------+--------------------+
|     1|Jean-Luc| 26|      2|2024-02-19 07:08:...|
|     9|    Hugh| 27|    181|2024-02-19 07:08:...|
|    16|  Weyoun| 22|    323|2024-02-19 07:08:...|
|    21|   Miles| 19|    268|2024-02-19 07:08:...|
|    24|  Julian| 25|      1|2024-02-19 07:08:...|
|    25|     Ben| 21|    445|2024-02-19 07:08:...|
|    26|  Julian| 22|    100|2024-02-19 07:08:...|
|    32|     Nog| 26|    281|2024-02-19 07:08:...|
|    35| Beverly| 27|    305|2024-02-19 07:08:...|
|    46|    Morn| 25|     96|2024-02-19 07:08:...|
|    47|   Brunt| 24|     49|2024-02-19 07:08:...|
|    48|     Nog| 20|      1|2024-02-19 07:08:...|
|    52| Beverly| 19|    269|2024-02-19 07:08:...|
|    54|   Brunt| 19|      5|2024-02-19 07:08:...|
|    60|  Geordi| 20|    100|2024-02-19 07:08:...|
|    66|  Geordi| 21|    477|2024-02-19 07:08:...|
|    72|  Kasidy| 22|    179|20

In [8]:
#Creating a Temp View
output.createOrReplaceTempView("peoples")

In [9]:
#Running a simple Spark SQL query
spark_fakefriends.sql("select userId,name,age,friends,insert_timestamp from peoples").show()

+------+--------+---+-------+--------------------+
|userId|    name|age|friends|    insert_timestamp|
+------+--------+---+-------+--------------------+
|     1|Jean-Luc| 26|      2|2024-02-19 07:08:...|
|     9|    Hugh| 27|    181|2024-02-19 07:08:...|
|    16|  Weyoun| 22|    323|2024-02-19 07:08:...|
|    21|   Miles| 19|    268|2024-02-19 07:08:...|
|    24|  Julian| 25|      1|2024-02-19 07:08:...|
|    25|     Ben| 21|    445|2024-02-19 07:08:...|
|    26|  Julian| 22|    100|2024-02-19 07:08:...|
|    32|     Nog| 26|    281|2024-02-19 07:08:...|
|    35| Beverly| 27|    305|2024-02-19 07:08:...|
|    46|    Morn| 25|     96|2024-02-19 07:08:...|
|    47|   Brunt| 24|     49|2024-02-19 07:08:...|
|    48|     Nog| 20|      1|2024-02-19 07:08:...|
|    52| Beverly| 19|    269|2024-02-19 07:08:...|
|    54|   Brunt| 19|      5|2024-02-19 07:08:...|
|    60|  Geordi| 20|    100|2024-02-19 07:08:...|
|    66|  Geordi| 21|    477|2024-02-19 07:08:...|
|    72|  Kasidy| 22|    179|20

In [10]:
output.write\
.format("CSV")\
.mode("overwrite")\
.option('path','/content/spark-warehouse/')\
.bucketBy(4,'age')\
.saveAsTable('bucketed_fakefreinds')

In [None]:
bucketed_fakefreinds_df = spark_fakefriends.sql('select * from bucketed_fakefreinds')
bucketed_fakefreinds_df.show(1000)

# To load and analyse the operations_management.csv data.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc

In [None]:
spark_operations = SparkSession.builder.appName('operations_management data analisation').getOrCreate()

In [None]:
print(spark_operations.version)

3.5.0


In [None]:
data_frame = spark_operations.read.format('CSV').\
option('inferSchema','true').\
option('header','true').\
option('path','/content/operations_management.csv').\
load()

In [None]:
data_frame.printSchema()

root
 |-- description: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- level: integer (nullable = true)
 |-- size: string (nullable = true)
 |-- line_code: string (nullable = true)
 |-- value: integer (nullable = true)



In [None]:
data_frame_2 = data_frame.select('industry','value').\
where(data_frame.value > 10000).\
orderBy(desc('value'))

In [None]:
data_frame_2.printSchema()

root
 |-- industry: string (nullable = true)
 |-- value: integer (nullable = true)



In [None]:
data_frame_2.show(5)

+--------+-----+
|industry|value|
+--------+-----+
|   total|41091|
|   total|40431|
|   total|33984|
|   total|33750|
|   total|32652|
+--------+-----+
only showing top 5 rows



In [None]:
# Or we can use filter instead of where clause to filter using value columns
data_frame_3 = data_frame.select('industry','value').\
filter((col('value') > 200) & (col('industry') != 'total')).\
orderBy(desc('value'))

In [None]:
data_frame_3.printSchema()

root
 |-- industry: string (nullable = true)
 |-- value: integer (nullable = true)



In [None]:
data_frame_3.show(5)

+--------------------+-----+
|            industry|value|
+--------------------+-----+
|        Construction| 6030|
|        Construction| 5904|
|        Construction| 5229|
|Accommodation & f...| 5058|
|        Construction| 4965|
+--------------------+-----+
only showing top 5 rows



In [None]:
# Creating a Temp View
data_frame_3.createOrReplaceTempView('data') #Here data is any name given to view

In [None]:
# To get the temp view data
spark_operations.sql('''select industry, value
from data
where value >200 and
industry !="total" order by value desc
''').show(5)

+--------------------+-----+
|            industry|value|
+--------------------+-----+
|        Construction| 6030|
|        Construction| 5904|
|        Construction| 5229|
|Accommodation & f...| 5058|
|        Construction| 4965|
+--------------------+-----+
only showing top 5 rows



# To Create Global and Section Scope view using operations_management.csv data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc

In [None]:
spark_operations = SparkSession.builder.appName('operations_management data analisation').getOrCreate()

In [None]:
print(spark_operations.version)

3.5.0


In [None]:
data_frame = spark_operations.read.format('CSV').\
option('inferSchema','true').\
option('header','true').\
option('path','/content/operations_management.csv').\
load()

In [None]:
data_frame.printSchema()

root
 |-- description: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- level: integer (nullable = true)
 |-- size: string (nullable = true)
 |-- line_code: string (nullable = true)
 |-- value: integer (nullable = true)



In [None]:
data_frame.createOrReplaceGlobalTempView("test")

In [None]:
data_frame_4 = spark_operations.sql('select * from test')
data_frame_4.show()

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `test` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [test], [], false


In [None]:
# spark_operations.catalog.dropGlobalTempView('test')
spark_operations.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]

In [None]:
# Assuming spark is your SparkSession object
global_temp_views = spark_operations.catalog.listTables("global_temp")

# Print the list of global temporary views
for view in global_temp_views:
    print(view)


Table(name='test', catalog=None, namespace=['global_temp'], description=None, tableType='TEMPORARY', isTemporary=True)


# PySpark UDF (User Defined Functions)

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import LongType
import pandas as pd
from pyspark.sql.functions import col, pandas_udf

In [17]:
spark_udf = SparkSession.builder.appName('spark_udf').enableHiveSupport().getOrCreate()

In [21]:
def cubed(a:pd.Series) -> pd.Series:
  return a*a*a

In [22]:
# Registring the UDF with pandas_udf function
cubed_udf = pandas_udf(cubed, returnType=LongType())

In [25]:
x = pd.Series([1,2,3])
print(cubed(x))

0     1
1     8
2    27
dtype: int64


In [33]:
df = spark_udf.range(1,4)
df.select(col('id'), cubed_udf('id')).show()

+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+



# Integrating MongoDB with PySpark


