In [0]:
from pyspark.sql.types import *

c_schema =StructType([
  StructField("gamer_id", IntegerType(), True),
  StructField("game", StringType(), True),
  StructField("behavior", StringType(), True),
  StructField("playhours", DecimalType(), True),
  StructField("ratings", StringType(), True)
]
)
df = spark.read \
.option("header",True) \
.schema(c_schema) \
.csv("/Volumes/workspace/garage/files/datasets/steam-200k.csv")
display(df)
df.printSchema()

In [0]:
df.select("game").distinct().count()

In [0]:
df.rdd.getNumPartitions()

In [0]:
df.write.mode("overwrite").parquet("/Volumes/workspace/garage/files/datasets/gamelogs_unpart")

In [0]:
display(dbutils.fs.ls("/Volumes/workspace/garage/files/datasets/gamelogs_unpart"))


In [0]:
df.write.mode("overwrite").partitionBy("game").parquet("/Volumes/workspace/garage/files/datasets/gamelogs_part")
display(dbutils.fs.ls("/Volumes/workspace/garage/files/datasets/gamelogs_part"))


In [0]:
df = spark.read \
.option("delemeter",",") \
.option("header",True) \
.option("encoding","UTF-8") \
.csv("/Volumes/workspace/garage/files/datasets/cancer.csv")

display(df)

In [0]:
df.printSchema()

In [0]:
from pyspark.sql.types import *

c_schema =StructType([
  StructField("customer_id", IntegerType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True),
  StructField("annual_income_k_usd", IntegerType(), True),
  StructField("score", IntegerType(), True)
]
)

In [0]:
df_custom_schema = spark.read \
.option("delemeter",",") \
.option("header",True) \
.schema(c_schema) \
.csv("/Volumes/workspace/garage/files/datasets/Mall_Customers.csv")
display(df_custom_schema)
df_custom_schema.printSchema()

In [0]:
li = df.take(2)
print(li)

In [0]:
print(len(li))
print(li[0]["State"])

In [0]:
df_json =spark.read.option("multiline", True).json('/Volumes/workspace/garage/files/datasets/used_cars_nested.json')
display(df_json)

In [0]:
df_order_list = spark.read.option("header",True).option("inferSchema",True).csv('/Volumes/workspace/garage/files/datasets/sales/orderlist.csv')
display(df_order_list)


In [0]:
from pyspark.sql.functions import col,current_timestamp,current_date,to_date,to_timestamp,dayofmonth,day,year,quarter,month,weekofyear,to_date,to_timestamp,date_add,date_sub,date_trunc,date_format,add_months,months_between,date_diff
df_ps10 = df_order_list \
    .withColumn("OrderDate",to_date(col("Order Date"),"dd-MM-yyyy"))\
    .withColumn("current_date",current_date())\
    .withColumn("current_timestamp",current_timestamp())\
    .withColumn("Day",dayofmonth(col("OrderDate")))\
    .withColumn("Year",year(col("OrderDate")))\
    .withColumn("Quarter",quarter(col("OrderDate")))\
    .withColumn("Month",month(col("OrderDate")))\
    .withColumn("order_next_10_day", date_add(col("OrderDate"),10)) \
    .withColumn("order_prev_10_day", date_sub(col("OrderDate"),10))\
    .withColumn("Order_next_month",add_months(col("OrderDate"),1))\
    .withColumn("order_prev_month",add_months(col("OrderDate"),-1))\
    .withColumn("date_diff",date_diff(col("OrderDate"),col("current_date")))
    
display(df_ps10)

In [0]:
from pyspark.sql.functions import lpad,rpad,repeat,split,col,regexp_replace,regexp_extract,upper,lower,initcap,concat_ws,concat,substring,length,regexp_replace,regexp_extract_all,regexp_extract,sha1,sha2,md5,substring_index

In [0]:
df_ps09 = df_order_list.withColumn("cust_length", length(col("CustomerName")))
display(df_ps09)

In [0]:
df_ps09 = df_order_list \
    .withColumn("cust_lower", lower(col("CustomerName"))) \
    .withColumn("cust_upper", upper(col("CustomerName"))) \
    .withColumn("cust_initcap", initcap(col("CustomerName"))) \
    .withColumn("hidden_column",regexp_replace(col("CustomerName"),"a","*") ) \
    .withColumn("cust_start",substring(col("CustomerName"),1,3)) \
    .withColumn("cust_lastname",substring(col("CustomerName"),-3,3)) \
    .withColumn("Year", split(col("Order Date"),"-")[0]) \
    .withColumn("Month", split(col("Order Date"),"-")[1]) \
    .withColumn("repeat_date",repeat(col("Order Date"),1)) \
    .withColumn("cust_lpad",lpad(col("CustomerName"),10,"#")) \
    .withColumn("cust_rpad",rpad(col("CustomerName"),10,"$")) \

    
   
    
display(df_ps09)


In [0]:
display(df_order_list.filter(df_order_list.CustomerName.isNotNull()).orderBy([df_order_list.CustomerName,df_order_list.State],ascending=True))

In [0]:
display(df_order_list.filter(df_order_list.CustomerName.isNotNull()).orderBy(df_order_list.CustomerName.asc_nulls_last()))

In [0]:
df_parquet = spark.read.parquet('/Volumes/workspace/garage/files/datasets/USED_CAR_PARQUET/')
display(df_parquet)

In [0]:
from pyspark.sql.functions import col
df_agg = df_parquet.groupBy("body_type").count().orderBy(col("count").desc( ))
display(df_agg )

In [0]:
display(df_parquet.groupBy("body_type","brand_name").count().orderBy(col("count").desc( )))

In [0]:
from pyspark.sql.functions import col
df_avgprice = df_parquet.groupBy("body_type").mean("price").orderBy(col("avg(price)").desc())
display(df_avgprice)
                                                                

In [0]:
df_agg = df_parquet.agg({"brand_name":"count","body_type":"count","price":"avg"})
display(df_agg)


In [0]:
df_describe = df_parquet.describe()
display(df_describe)

In [0]:
all_columns =df_parquet.columns
print(all_columns)

In [0]:
display(df_parquet.select("vehicle_type","model","price"))

In [0]:
from pyspark.sql.functions import col
display(df_parquet.select(col("price"),col("model")))


In [0]:
from pyspark.sql.functions import col
df_limit = df_parquet.select(col("price"),col("model")).limit(10)
display(df_limit)

In [0]:
from pyspark.sql.functions import concat_ws, col
df_parquet = spark.read.parquet('/Volumes/workspace/garage/files/datasets/USED_CAR_PARQUET/')
df_processed = df_parquet \
.withColumn("full name",concat_ws(" ", col("brand_name"),col("model")))
df_processed =df_processed.withColumnRenamed("full name","vehicle name")
display(df_processed.drop("model"))

In [0]:
df_suv = df_processed.filter(col("body_type") == "SUV")
display(df_suv)

In [0]:
from pyspark.sql.functions import col,regexp_replace, when
df_suv = df_processed \
    .withColumn("power", when(regexp_replace(df_parquet.displacement,"cc","")<1500,"Low") \
        .when((regexp_replace(df_parquet.displacement,"cc","")>=1500) & (regexp_replace(df_parquet.displacement,"cc","")<2500),"Medium") \
        .otherwise("High"))
display(df_suv)

In [0]:
df_suz_wh = df_processed \
    .filter((col("brand_name")=="Suzuki") & (col("color")=="White"))
display(df_suz_wh)

In [0]:
from pyspark.sql.functions import upper
df_car = df_processed \
    .filter((upper(col("brand_name"))=="honda".upper()) | (upper(col("brand_name"))=="suzuki".upper()))
display(df_car)

In [0]:
from pyspark.sql.functions import expr,regexp_replace
df_car = df_processed \
    .withColumn("displacement",regexp_replace(col("displacement"),"cc","")) \
    .withColumn("power", expr("case when displacement < 1500 then 'Low' when displacement >= 1500 and displacement < 2500 then 'Medium' else 'High' end")) \
    .drop("manufacturer","description","adtitle") \
    .select("body_type","brand_name","color","displacement","power")
display(df_car)

In [0]:
%sql
create table if not exists t1 (id int);
create table if not exists t2 (id int);
create table if not exists t3 (id int);


In [0]:
%sql
insert into t1 values (1),(2),(3),(4),(NULL);
insert into t2 values (1),(2);
insert into t3 values (4),(5),(NULL);

In [0]:
df_1 = spark.sql("select * from t1")
df_2 = spark.sql("select * from t2")
df_3 = spark.sql("select * from t3")

In [0]:
df_inner = df_1.join(df_2,df_1.id == df_2.id,"inner")
display(df_inner)

In [0]:
df_left = df_1.join(df_2,df_1.id == df_2.id,"left")
display(df_left)

In [0]:
df_cartesian = df_1.crossJoin(df_3)
display(df_cartesian)
