In [30]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import *
import os

spark = SparkSession.builder.master('local[1]').appName('bbi').getOrCreate()

In [31]:
auto_df_fixed = spark.read.format("csv").option("header", True).load("/home/basu/data/auto-mpg-fixed.csv")
auto_df_fixed.show()
auto_df_fixed.printSchema()

+----+---------+------------+----------+------+------------+---------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin|             carname|
+----+---------+------------+----------+------+------------+---------+------+--------------------+
|18.0|        8|       307.0|     130.0| 3504.|        12.0|       70|     1|chevrolet chevell...|
|15.0|        8|       350.0|     165.0| 3693.|        11.5|       70|     1|   buick skylark 320|
|18.0|        8|       318.0|     150.0| 3436.|        11.0|       70|     1|  plymouth satellite|
|16.0|        8|       304.0|     150.0| 3433.|        12.0|       70|     1|       amc rebel sst|
|17.0|        8|       302.0|     140.0| 3449.|        10.5|       70|     1|         ford torino|
|15.0|        8|       429.0|     198.0| 4341.|        10.0|       70|     1|    ford galaxie 500|
|14.0|        8|       454.0|     220.0| 4354.|         9.0|       70|     1|    chevrolet impala|
|14.0|    

In [32]:
for (column_name) in ("mpg cylinders displacement horsepower weight acceleration".split()):
    auto_df_fixed = auto_df_fixed.withColumn(column_name, col(column_name).cast("double"))
auto_df_fixed = auto_df_fixed.withColumn("modelyear", col("modelyear").cast("int"))
auto_df_fixed = auto_df_fixed.withColumn("origin", col("origin").cast("int"))

In [33]:
df = auto_df_fixed.withColumn('upper', upper(col('carname'))).show()

+----+---------+------------+----------+------+------------+---------+------+--------------------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin|             carname|               upper|
+----+---------+------------+----------+------+------------+---------+------+--------------------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|       70|     1|chevrolet chevell...|CHEVROLET CHEVELL...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|       70|     1|   buick skylark 320|   BUICK SKYLARK 320|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|       70|     1|  plymouth satellite|  PLYMOUTH SATELLITE|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|       70|     1|       amc rebel sst|       AMC REBEL SST|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|       70|     1|         ford torino|         FORD TORINO|
|15.0|      8.0|       429.0|     198.0|

In [34]:
auto_df_fixed.withColumn('lower', lower(col('carname'))).show()

+----+---------+------------+----------+------+------------+---------+------+--------------------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin|             carname|               lower|
+----+---------+------------+----------+------+------------+---------+------+--------------------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|       70|     1|chevrolet chevell...|chevrolet chevell...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|       70|     1|   buick skylark 320|   buick skylark 320|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|       70|     1|  plymouth satellite|  plymouth satellite|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|       70|     1|       amc rebel sst|       amc rebel sst|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|       70|     1|         ford torino|         ford torino|
|15.0|      8.0|       429.0|     198.0|

In [35]:
auto_df_fixed.withColumn('modelyear', concat(lit(19), col('modelyear'))).show()

+----+---------+------------+----------+------+------------+---------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin|             carname|
+----+---------+------------+----------+------+------------+---------+------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|     1970|     1|chevrolet chevell...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|     1970|     1|   buick skylark 320|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|     1970|     1|  plymouth satellite|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|     1970|     1|       amc rebel sst|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|     1970|     1|         ford torino|
|15.0|      8.0|       429.0|     198.0|4341.0|        10.0|     1970|     1|    ford galaxie 500|
|14.0|      8.0|       454.0|     220.0|4354.0|         9.0|     1970|     1|    chevrolet impala|
|14.0|    

In [17]:
auto_df_fixed.withColumn('mpg', when(col('mpg') <= 20, 'low').
                        when(col('mpg') <= 30, 'mid').
                         when(col('mpg') <= 40, 'high').
                         otherwise('Very High')).show(truncate = False)

+---+---------+------------+----------+------+------------+---------+------+-------------------------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin|carname                  |
+---+---------+------------+----------+------+------------+---------+------+-------------------------+
|low|8.0      |307.0       |130.0     |3504.0|12.0        |70       |1     |chevrolet chevelle malibu|
|low|8.0      |350.0       |165.0     |3693.0|11.5        |70       |1     |buick skylark 320        |
|low|8.0      |318.0       |150.0     |3436.0|11.0        |70       |1     |plymouth satellite       |
|low|8.0      |304.0       |150.0     |3433.0|12.0        |70       |1     |amc rebel sst            |
|low|8.0      |302.0       |140.0     |3449.0|10.5        |70       |1     |ford torino              |
|low|8.0      |429.0       |198.0     |4341.0|10.0        |70       |1     |ford galaxie 500         |
|low|8.0      |454.0       |220.0     |4354.0|9.0         |70       |1   

In [36]:
auto_df_fixed.withColumn('xyz', lit(19)).show()

+----+---------+------------+----------+------+------------+---------+------+--------------------+---+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin|             carname|xyz|
+----+---------+------------+----------+------+------------+---------+------+--------------------+---+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|       70|     1|chevrolet chevell...| 19|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|       70|     1|   buick skylark 320| 19|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|       70|     1|  plymouth satellite| 19|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|       70|     1|       amc rebel sst| 19|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|       70|     1|         ford torino| 19|
|15.0|      8.0|       429.0|     198.0|4341.0|        10.0|       70|     1|    ford galaxie 500| 19|
|14.0|      8.0|       454.0|     220.0|4354.0|         9.0|       70|   

In [37]:
# Sample data
data = [
    (1, 'John', 85),
    (2, 'Alice', 92),
    (3, 'Bob', 85),
    (4, 'Charlie', 78),
    (5, 'Eve', 92),
    (6, 'Frank', 78),
]

# Create a Spark session
spark = SparkSession.builder.appName("RankExample").getOrCreate()

# Create a DataFrame with the sample data
columns = ["EmployeeID", "EmployeeName", "PerformanceScore"]
df = spark.createDataFrame(data, columns)

# Define a window specification for ranking
window_spec = Window.orderBy(F.col("PerformanceScore").desc())

# Add a PerformanceRank column using the rank function
df = df.withColumn("PerformanceRank", F.rank().over(window_spec))

# Create a subquery to filter the results
subquery = (
    df.select("EmployeeID", "EmployeeName", "PerformanceScore", "PerformanceRank")
    .filter((F.col("PerformanceRank") <= 3) | (F.col("PerformanceRank") == 4))
)

# Show the result
subquery.show()


24/01/14 01:25:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/01/14 01:25:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/14 01:25:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/14 01:25:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+------------+----------------+---------------+
|EmployeeID|EmployeeName|PerformanceScore|PerformanceRank|
+----------+------------+----------------+---------------+
|         2|       Alice|              92|              1|
|         5|         Eve|              92|              1|
|         1|        John|              85|              3|
|         3|         Bob|              85|              3|
+----------+------------+----------------+---------------+



24/01/14 01:25:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/14 01:25:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [38]:
auto_df_fixed.show()

+----+---------+------------+----------+------+------------+---------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin|             carname|
+----+---------+------------+----------+------+------------+---------+------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|       70|     1|chevrolet chevell...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|       70|     1|   buick skylark 320|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|       70|     1|  plymouth satellite|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|       70|     1|       amc rebel sst|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|       70|     1|         ford torino|
|15.0|      8.0|       429.0|     198.0|4341.0|        10.0|       70|     1|    ford galaxie 500|
|14.0|      8.0|       454.0|     220.0|4354.0|         9.0|       70|     1|    chevrolet impala|
|14.0|    

In [39]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Create Spark session
spark = SparkSession.builder.appName("example_UDF").getOrCreate()

# Define custom function
def my_custom_function(value):
    return value.upper()

# Register UDF with explicit return type
my_udf = udf(my_custom_function, StringType())

# Sample DataFrame
data = [("John",), ("Doe",), ("Alice",)]
columns = ["name"]
df = spark.createDataFrame(data, columns)

# Apply UDF and create a new column
df = df.withColumn("name_uppercase", my_udf(df["name"]))

# Show the result
df.show()


24/01/14 01:25:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-----+--------------+
| name|name_uppercase|
+-----+--------------+
| John|          JOHN|
|  Doe|           DOE|
|Alice|         ALICE|
+-----+--------------+



In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Create Spark session
spark = SparkSession.builder.appName("example_UDF").getOrCreate()

# Sample DataFrame
data = [("John", "Doe"), ("Alice", "Smith"), ("Bob", "Johnson")]
columns = ["first_name", "last_name"]
df = spark.createDataFrame(data, columns)

# Define a UDF to concatenate first and last names
def concatenate_names(first_name, last_name):
    return f"{first_name} {last_name}"

# Register UDF with explicit return type
concatenate_names_udf = udf(concatenate_names, StringType())

# Apply UDF column-wise and create a new column
df = df.withColumn("full_name", concatenate_names_udf(df["first_name"], df["last_name"]))

# Show the result
df.show()


+----------+---------+-----------+
|first_name|last_name|  full_name|
+----------+---------+-----------+
|      John|      Doe|   John Doe|
|     Alice|    Smith|Alice Smith|
|       Bob|  Johnson|Bob Johnson|
+----------+---------+-----------+



In [41]:
# Specify the path to your JSON file
json_file_path = "/home/basu/data/sample.json"

# Read JSON file with PERMISSIVE mode
df = spark.read.option("mode", "PERMISSIVE").json(json_file_path)

# Show the DataFrame after removing corrupt records
df.show()


+---+-------------+-----+
|age|         city| name|
+---+-------------+-----+
| 25|     New York| John|
| 30|San Francisco|Alice|
| 28|  Los Angeles|  Bob|
+---+-------------+-----+



In [None]:
df.withColumn('age

In [47]:
# Specify the correct path to your CSV file
csv_file_path = '/home/basu/data/finance.csv'
df = spark.read.option('header', 'True').option('inferSchema', 'True').csv(csv_file_path)
df_selected = df.select('year', col('Industry_aggregation_NZSIOC').alias('industry_agg'),\
                        col('Industry_code_NZSIOC').alias('inddustry_code')).limit(50)
df_selected.count()

50