<a href="https://colab.research.google.com/github/hirenbioinfo/google-colab-notebook/blob/main/colab_pyspark_tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#  Install Pyspark

Installing PySpark is a straightforward process, but it requires some pre-requisites. PySpark is the Python library for Apache Spark, a fast and general-purpose cluster-computing framework for big data processing.

In [None]:
#Here are the steps to install PySpark for Python from source

#Install Java
#!apt-get update
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and install Spark
#!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
#!tar xf spark-3.2.1-bin-hadoop3.2.tgz

# Install PySpark
!pip install -q pyspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
import findspark
findspark.init()
import pyspark
from pyspark.sql.import Sparksession
spark = SparkSession.builder.getOrCreate()
#spark

# Start SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [None]:
#optional
#on cluster
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

In [None]:
# on google colab
! pip install pyspark py4j #all you need

In [None]:
# intitiate a spark season
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-dataframe-demo').getOrCreate()
# see spark version
spark

In [None]:
#import requests
#import pandas as pd
#from pyspark.sql import SparkSession
#from pyspark.sql import DataFrame

In [None]:
#Mount Google Drive:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Set Up PySpark:
#If you haven't set up PySpark yet, do the steps mentioned previously:

# #!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# #!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
# # !tar xf spark-3.1.2-bin-hadoop3.2.tgz
# !pip install -q findspark

# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

# import findspark
# findspark.init()


In [None]:
#Read the CSV File from Google Drive with PySpark:
#from pyspark.sql import SparkSession
#spark = SparkSession.builder\
#   .appName('GoogleDriveCSV')\
#  .getOrCreate()


In [None]:
# read file from goolge drive
# we have manually downaloded the origianl csv file and saved to google drive
file_path = '/content/drive/My Drive/Udemy/original.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)
# see the dataframe
#by default, the show() method displays 20 rows
df.show()
# first five row of the dataframe
df.show(5)

In [None]:
df.dtypes

In [None]:
from pyspark.sql.types import *

schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('first_name', StringType(), True),
    StructField('last_name', StringType(), True),
    StructField('gender', StringType(), True),
    StructField('City', StringType(), True),
    StructField('JobTitle', StringType(), True),
    StructField('Salary', StringType(), True),
    StructField('Latitude', StringType(), True),
    StructField('Longitude', FloatType(), True),
])

In [None]:
df2 =spark.read.csv(file_path, header=True, schema=schema)
df2.show()

In [None]:
df2.printSchema()

In [None]:
# describe the dataframe
df2.describe().show()
#similar in pandas
#print(df.describe())

In [None]:
# list of the columns
df2.columns
df2.distinct().count()

In [None]:
distinct_count_gender = df2.select("City").distinct().count()
print(distinct_count_gender)

In [None]:
# total number of unique rows
print("The number of unique values in the 'gender' column is: {}".format(distinct_count_gender))

In [None]:
# Count null values for each column
from pyspark.sql.functions import when, count, isnull, col
null_counts = df.agg(*[count(when(isnull(c), c)).alias(c) for c in df.columns])
null_counts.show()
# for a particular column
null_cities = df.filter(df["City"].isNull()).count()
null_job_titles = df.filter(df["JobTitle"].isNull()).count()
print(f"Entries with null cities: {null_cities}")
print(f"Entries with null job titles: {null_job_titles}")

In [None]:
# handle the null values
df2_dropped=df2.na.drop()
df2_dropped.show()

In [None]:
# handle null to a specif column
# see which column jobTitle is null
df2_null_jobs=df2.filter(df2.JobTitle.isNull())
df2_null_jobs.show()

# see which column jobTitle is not null
df2_null_jobs=df2.filter(df2.JobTitle.isNotNull())
df2_null_jobs.show()

In [None]:
# see the na rows
df2.select(df2.JobTitle.isNull().alias("isJobTitleNull")).show()

In [None]:
# filtering
df2.filter("gender == 'Male'").show()
df2.where("gender == 'Female'").show()

In [None]:
# filtering with multiple condition
df2.filter(df2.City == "Bulgan").show()
df2.filter((df2["City"] == "Bulgan") & (df2["gender"] == "Female")).show()

In [None]:
from pyspark.sql.functions import regexp_replace

# Replace anything that's not a letter, a number, or a space with an empty string in the 'City' column
df_cleaned = df.withColumn("City", regexp_replace(df["City"], "[^a-zA-Z0-9 ]", ""))

df_cleaned.show()

In [None]:
from pyspark.sql.functions import regexp_replace
import matplotlib.pyplot as plt
df_cleaned_salary = df.withColumn("Cleaned_Salary", regexp_replace(df["Salary"], "[$,]", "").cast("float"))
salary_data = df_cleaned_salary.select("Cleaned_Salary").rdd.flatMap(lambda x: x).collect()
plt.figure(figsize=(10,6))
plt.hist(salary_data, bins=20, color='blue', edgecolor='black')
plt.title('Salary Distribution')
plt.xlabel('Salary')
plt.ylabel('Number of Employees')
plt.grid(True)
plt.show()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
plt.figure(figsize=(10,6))
sns.kdeplot(salary_data, shade=True, color='blue')
plt.title('Salary Density Plot')
plt.xlabel('Salary')
plt.ylabel('Density')
plt.grid(True)
plt.show()

In [None]:
colors = ['blue' if gender == 'Male' else 'pink' for gender in genders]
gender_counts = df.groupBy("gender").count().collect()
genders = [row['gender'] for row in gender_counts]
counts = [row['count'] for row in gender_counts]
import matplotlib.pyplot as plt
plt.figure(figsize=(10,6))
plt.bar(genders, counts, color=colors)
plt.title('Gender Distribution')
plt.xlabel('Gender')
plt.ylabel('Number of Individuals')
plt.grid(axis='y')
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Define colors for each gender. Adjust this if you have more categories.
colors = ['blue' if gender == 'Male' else 'pink' for gender in genders]

plt.figure(figsize=(8,8))
plt.pie(counts, labels=genders, colors=colors, autopct='%1.1f%%', startangle=140)
plt.title('Gender Distribution')
plt.show()


In [None]:
df.groupBy('City').agg(countDistinct('gender').alias('country_count')).orderBy(desc('country_count')).show()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Colab PySpark Session") \
    .master("local[*]") \
    .getOrCreate()


In [None]:
file_path = "//california_housing_test.csv"

# Read the CSV file
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first few rows of the dataframe
df.show()


In [None]:
df.dtypes

In [None]:
df.head(15)

In [None]:
df_with_classification = df.withColumn("house_price_category",
                                       when(df["median_house_value"] > 25000, "costly").otherwise("cheap"))

In [None]:
df.describe.show()

In [None]:
df.describe().show()

In [None]:
from pyspark.sql.functions import *
df_cleaned = df.withColumn("Clean_City", when(df.City.isNull(),'unknown').otherwise(df.City))
df_cleaned = df.withColumn("New_JobTitle", when(df.JobTitle.isNull(),'unknown').otherwise(df.JobTitle))

In [None]:
# same as above
#from pyspark.sql.functions import *
#df_cleaned = df.withColumn("Clean_City", when(isnull(df["City"]), 'unknown').otherwise(df["City"]))

In [None]:
df_cleaned.show()

In [None]:
df_no_duplicates=df.dropDuplicates()

In [None]:
df_no_duplicates.show()

In [None]:
mean=df.groupBy().avg('population').take(1)[0][0]

In [None]:
df_with_classification = df.withColumn("house_price_category",
                                       when(df["median_house_value"] > 25000, "costly").otherwise("cheap"))

In [None]:
mean.show()

In [None]:
print(mean)

In [None]:
df.columns

In [None]:
from pyspark.sql.functions import lit

In [None]:
import numpy as np
test=df.select('latitude')

In [None]:
test.show()

In [None]:
from pyspark.sql.functions import *
mydf2=

In [None]:
#Checking data entries for each column
df.select(['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms']).describe().show()

In [None]:
df.select(['population',
 'households',
 'median_income',
 'median_house_value']).describe().show()

In [None]:
#Checking if the prices are normally distributed
sns.distplot(df.select('median_income').toPandas(), color="skyblue")
df.select(F.skewness('median_income'), F.kurtosis('price')).show()

In [None]:
import seaborn as sns

In [None]:
!pip install Seaborn

In [None]:
! pip install pyspark py4j #all you need
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat

# Initialize a Spark session
spark = SparkSession.builder.appName("ConcatenateColumns").getOrCreate()

# Sample data
data = [("John", "Doe"), ("Jane", "Smith")]
df = spark.createDataFrame(data, ["first_name", "last_name"])

# Concatenate two columns
df_with_fullname = df.withColumn("full_name", concat(df["first_name"], df["last_name"]))

df_with_fullname.show()
