In [2]:
from pyspark.sql import SparkSession
spark  = SparkSession.builder.appName("customers_data").master("yarn").enableHiveSupport().getOrCreate()

25/11/05 18:37:49 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [6]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,DateType,BooleanType

In [19]:
customer_schema = StructType([
  
    StructField("customer_id" , IntegerType()),
    StructField("name" , StringType()),
    StructField("city" , StringType()),
    StructField("state" , StringType()),
    StructField("country" , StringType()),
    StructField("registration_date", StringType()),
    StructField("is_active", BooleanType())
    
])

# read registration_date as string to avoid marking invalid date formats as null, handle invalid date formats later


In [33]:
customers_data = spark.read.format('csv').option("header","true").option("schema",customer_schema).load('/data/customers.csv')
customers_data.show()

                                                                                

+-----------+-----------+---------+-----------+-------+-----------------+---------+
|customer_id|       name|     city|      state|country|registration_date|is_active|
+-----------+-----------+---------+-----------+-------+-----------------+---------+
|          0| Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    False|
|          1| Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     True|
|          2| Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     True|
|          3| Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    False|
|          4| Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    False|
|          5| Customer_5|Hyderabad|  Karnataka|  India|       2023-07-28|    False|
|          6| Customer_6|     Pune|      Delhi|  India|       2023-08-29|    False|
|          7| Customer_7|Ahmedabad|West Bengal|  India|       2023-12-28|     True|
|          8| Customer_8|     Pune|  Karnataka|  India|       2023-06-22|   

In [34]:
customers_data.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- is_active: string (nullable = true)



In [35]:
from pyspark.sql.functions import to_date,col

invalid_dates = customers_data.filter(to_date(customers_data['registration_date'],'yyyy-MM-dd').isNull() & customers_data['registration_date'].isNotNull())

In [36]:
invalid_dates.select("registration_date").distinct().show()

+-----------------+
|registration_date|
+-----------------+
+-----------------+



In [37]:
customers_data = customers_data.withColumn('registration_date', to_date(customers_data['registration_date'],'yyyy-MM-dd'))

In [38]:
customers_data.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: string (nullable = true)



In [40]:
from pyspark.sql.functions import upper,lower

customers_data = customers_data.withColumn('city', lower(customers_data['city']))

In [41]:
customers_data.show()

+-----------+-----------+---------+-----------+-------+-----------------+---------+
|customer_id|       name|     city|      state|country|registration_date|is_active|
+-----------+-----------+---------+-----------+-------+-----------------+---------+
|          0| Customer_0|     pune|Maharashtra|  India|       2023-06-29|    False|
|          1| Customer_1|bangalore| Tamil Nadu|  India|       2023-12-07|     True|
|          2| Customer_2|hyderabad|    Gujarat|  India|       2023-10-27|     True|
|          3| Customer_3|bangalore|  Karnataka|  India|       2023-10-17|    False|
|          4| Customer_4|ahmedabad|  Karnataka|  India|       2023-03-14|    False|
|          5| Customer_5|hyderabad|  Karnataka|  India|       2023-07-28|    False|
|          6| Customer_6|     pune|      Delhi|  India|       2023-08-29|    False|
|          7| Customer_7|ahmedabad|West Bengal|  India|       2023-12-28|     True|
|          8| Customer_8|     pune|  Karnataka|  India|       2023-06-22|   

In [42]:
customers_data = customers_data.withColumn('country', upper(customers_data['country']))
customers_data.show()

+-----------+-----------+---------+-----------+-------+-----------------+---------+
|customer_id|       name|     city|      state|country|registration_date|is_active|
+-----------+-----------+---------+-----------+-------+-----------------+---------+
|          0| Customer_0|     pune|Maharashtra|  INDIA|       2023-06-29|    False|
|          1| Customer_1|bangalore| Tamil Nadu|  INDIA|       2023-12-07|     True|
|          2| Customer_2|hyderabad|    Gujarat|  INDIA|       2023-10-27|     True|
|          3| Customer_3|bangalore|  Karnataka|  INDIA|       2023-10-17|    False|
|          4| Customer_4|ahmedabad|  Karnataka|  INDIA|       2023-03-14|    False|
|          5| Customer_5|hyderabad|  Karnataka|  INDIA|       2023-07-28|    False|
|          6| Customer_6|     pune|      Delhi|  INDIA|       2023-08-29|    False|
|          7| Customer_7|ahmedabad|West Bengal|  INDIA|       2023-12-28|     True|
|          8| Customer_8|     pune|  Karnataka|  INDIA|       2023-06-22|   

In [43]:
customers_data.write.mode('overwrite').saveAsTable('customers_table')

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
25/11/05 19:05:44 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


In [45]:
spark.sql('show tables').show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|  default|      customers|      false|
|  default|customers_table|      false|
+---------+---------------+-----------+



In [46]:
spark.sql('drop table customers').show()

++
||
++
++



In [47]:
spark.sql('show tables').show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|  default|customers_table|      false|
+---------+---------------+-----------+



In [53]:
spark.sql('select state, count(*) as number_of_customers from customers_table group by state').show()

+-----------+-------------------+
|      state|number_of_customers|
+-----------+-------------------+
|    Gujarat|                 18|
|      Delhi|                 10|
|  Karnataka|                 16|
|  Telangana|                 16|
|Maharashtra|                 11|
| Tamil Nadu|                 15|
|West Bengal|                 13|
+-----------+-------------------+



In [60]:
spark.sql('''select is_active, count(*) from customers_table where state='Gujarat' group by is_active''').show()

+---------+--------+
|is_active|count(1)|
+---------+--------+
|     True|      11|
|    False|       7|
+---------+--------+



In [61]:
spark.stop()

In [1]:
from pyspark.sql import SparkSession