In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!


In [None]:
df = spark.sql("SELECT * FROM H1B.visa LIMIT 1000")
display(df)

In [25]:
%%sql
ALTER TABLE visa
SET TBLPROPERTIES (
   'delta.columnMapping.mode' = 'name',
   'delta.minReaderVersion' = '2',
   'delta.minWriterVersion' = '5'
);


StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 28, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

### <mark>**DROPPING UNNECESSARY COLUMNS**</mark>

In [27]:
%%sql
ALTER TABLE visa
DROP COLUMN BASIS_FOR_CLASSIFICATION,BEN_CURRENT_CLASS,NUMBER_OF_BENEFICIARIES,country_of_nationality, NAICS_CODE, BEN_SEX, WORKSITE_CITY,WORKSITE_STATE,WORKSITE_STREET,WORKSITE_ZIP,mail_addr,ben_date_of_birth,i129_employer_name,PET_CITY,PET_STATE,PET_STREET,PET_ZIP,REQUESTED_ACTION,REQUESTED_CLASS,DOL_ETA_CASE_NUMBER,FULL_TIME_IND,WAGE_AMT,WAGE_UNIT,valid_from,valid_to,S1Q1A,S1Q1B,S3Q1,T_U_VAWA_FLAG;
--DROP COLUMNS bcn, agent_first_name,agent_last_name,S4Q1, DOT_CODE,NUM_OF_EMP_IN_US,BEN_COUNTRY_OF_BIRTH,RECEIPT_NUMBER,zip

StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 30, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, floor, rand, datediff, date_add, date_sub, make_date

# Initialize Spark session
spark = SparkSession.builder.appName("UpdateRecDate").getOrCreate()

# Load the visa data
visa_df = spark.read.format("delta").load("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")

# Step 1: Create a temporary DataFrame with the updated `rec_date` values
updated_df = visa_df.withColumn(
    "new_rec_date",
    expr("""
        CASE 
            WHEN lottery_year IS NOT NULL THEN 
                DATE_ADD(DATE_SUB(MAKE_DATE(lottery_year, 8, 1), 365), CAST(FLOOR(RAND() * DATEDIFF(DAY, DATE_SUB(MAKE_DATE(lottery_year, 8, 1), 365), MAKE_DATE(lottery_year, 3, 31))) AS INT))
            ELSE 
                DATE_ADD(DATE_SUB(MAKE_DATE(2000, 8, 1), 365), CAST(FLOOR(RAND() * DATEDIFF(DAY, DATE_SUB(MAKE_DATE(2000, 8, 1), 365), MAKE_DATE(2000, 3, 31))) AS INT))
        END
    """)
)

# Step 2: Drop the existing `rec_date` column
final_df = updated_df.drop("rec_date")

# Step 3: Rename the new `rec_date` column to `rec_date`
final_df = final_df.withColumnRenamed("new_rec_date", "rec_date")

# Step 4: Save the updated DataFrame back to the Delta table
final_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")


StatementMeta(, a3ffd4fb-faa5-4761-8703-f4193546543d, 12, Finished, Available, Finished)

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, floor, rand, datediff, date_add

# Initialize Spark session
spark = SparkSession.builder.appName("UpdateFirstDecisionDate").getOrCreate()

# Load the visa data
visa_df = spark.read.format("delta").load("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")

# Step 1: Create a temporary DataFrame with the updated `first_decision_date` values
updated_df = visa_df.withColumn(
    "new_first_decision_date",
    expr("""
        CASE 
            WHEN rec_date IS NOT NULL THEN 
                DATE_ADD(CAST(rec_date AS DATE), CAST(FLOOR(RAND() * (120 - 60 + 1) + 60) AS INT))
            ELSE 
                NULL
        END
    """)
)

# Step 2: Drop the existing `first_decision_date` column
final_df = updated_df.drop("first_decision_date")

# Step 3: Rename the new `first_decision_date` column to `first_decision_date`
final_df = final_df.withColumnRenamed("new_first_decision_date", "first_decision_date")

# Step 4: Save the updated DataFrame back to the original Delta table
final_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")


StatementMeta(, a3ffd4fb-faa5-4761-8703-f4193546543d, 13, Finished, Available, Finished)

### <mark>**CHANGE THE NULL VALUE TO DENIED**</mark>

In [28]:
%%sql
UPDATE visa
SET FIRST_DECISION = 'Denied'
WHERE FIRST_DECISION IS NULL


StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 31, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

# Initialize Spark session
spark = SparkSession.builder.appName("UpdateFirstDecisionColumn").getOrCreate()

# Load the visa data
visa_df = spark.read.format("delta").load("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")

# Update the `FIRST_DECISION` column to change NULL values to 'Denied'
updated_df = visa_df.withColumn("FIRST_DECISION", when(visa_df["FIRST_DECISION"].isNull(), "Denied").otherwise(visa_df["FIRST_DECISION"]))

# Save the updated DataFrame back to the original Delta table
updated_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")


StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 8, Finished, Available, Finished)

In [35]:
%%sql
SELECT 
    SUM(CASE WHEN country_of_birth IS NULL THEN 1 ELSE 0 END) AS null_country_of_birth,
    SUM(CASE WHEN ben_year_of_birth IS NULL THEN 1 ELSE 0 END) AS null_ben_year_of_birth,
    SUM(CASE WHEN gender IS NULL THEN 1 ELSE 0 END) AS null_gender,
    SUM(CASE WHEN employer_name IS NULL THEN 1 ELSE 0 END) AS null_employer_name,
    SUM(CASE WHEN FEIN IS NULL THEN 1 ELSE 0 END) AS null_FEIN,
    SUM(CASE WHEN city IS NULL THEN 1 ELSE 0 END) AS null_city,
    SUM(CASE WHEN state IS NULL THEN 1 ELSE 0 END) AS null_state,
    --SUM(CASE WHEN zip IS NULL THEN 1 ELSE 0 END) AS null_zip,
    SUM(CASE WHEN lottery_year IS NULL THEN 1 ELSE 0 END) AS null_lottery_year,
    SUM(CASE WHEN status_type IS NULL THEN 1 ELSE 0 END) AS null_status_type,
    SUM(CASE WHEN rec_date IS NULL THEN 1 ELSE 0 END) AS null_rec_date,
    SUM(CASE WHEN FIRST_DECISION IS NULL THEN 1 ELSE 0 END) AS null_FIRST_DECISION,
    SUM(CASE WHEN first_decision_date IS NULL THEN 1 ELSE 0 END) AS null_first_decision_date,
    SUM(CASE WHEN JOB_TITLE IS NULL THEN 1 ELSE 0 END) AS null_JOB_TITLE,
    SUM(CASE WHEN ED_LEVEL_DEFINITION IS NULL THEN 1 ELSE 0 END) AS null_ED_LEVEL_DEFINITION,
    SUM(CASE WHEN BEN_PFIELD_OF_STUDY IS NULL THEN 1 ELSE 0 END) AS null_BEN_PFIELD_OF_STUDY,
    SUM(CASE WHEN BEN_COMP_PAID IS NULL THEN 1 ELSE 0 END) AS null_BEN_COMP_PAID,
    SUM(CASE WHEN Age IS NULL THEN 1 ELSE 0 END) AS null_Age,
    SUM(CASE WHEN Time IS NULL THEN 1 ELSE 0 END) AS null_Time
    
FROM visa;

SELECT COUNT(*)
FROM visa

StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 41, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 18 fields>

<Spark SQL result set with 1 rows and 1 fields>

## <mark>**CHANGE THE COLUMN DATATYPE**</mark>

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("ChangeColumnDatatype").getOrCreate()

# Load the visa data
visa_df = spark.read.format("delta").load("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")

# Change the datatype of the specified columns
changed_df = visa_df \
    .withColumn("ben_year_of_birth", col("ben_year_of_birth").cast("int")) \
    .withColumn("FEIN", col("FEIN").cast("int")) \
    .withColumn("lottery_year", col("lottery_year").cast("int")) \
    .withColumn("BEN_COMP_PAID", col("BEN_COMP_PAID").cast("decimal(12,2)"))

# Save the updated DataFrame back to the original Delta table
changed_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")


StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 32, Finished, Available, Finished)

In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff, year, current_date

# Initialize Spark session
spark = SparkSession.builder.appName("AddColumns").getOrCreate()

# Load the visa data
visa_df = spark.read.format("delta").load("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")

# Add the new columns
visa_df = visa_df \
    .withColumn("Time", datediff(col("first_decision_date"), col("rec_date"))) \
    .withColumn("Age", year(current_date()) - col("ben_year_of_birth"))

# Save the updated DataFrame back to the original Delta table
visa_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")


StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 33, Finished, Available, Finished)

### <mark>**REMOVE THE NULL VALUES IN THE DATE COLUMNS**</mark>

In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("RemoveNullRows").getOrCreate()

# Load the data
df = spark.read.format("delta").load("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")

# Filter out rows with NULL values in rec_date or first_decision_date
cleaned_df = df.filter(col("FEIN").isNotNull())
#col("rec_date").isNotNull() & col("first_decision_date").isNotNull() &

# Save the cleaned DataFrame back to the original Delta table
cleaned_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/visa")


StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 36, Finished, Available, Finished)

In [33]:
%%sql
UPDATE visa
SET ED_LEVEL_DEFINITION = CASE
    WHEN ED_LEVEL_DEFINITION = "MASTER'S DEGREE" THEN "MASTER"
    WHEN ED_LEVEL_DEFINITION = "BACHELOR'S DEGREE" THEN "BACHELOR"
    WHEN ED_LEVEL_DEFINITION = "DOCTORATE DEGREE" THEN "DOCTORATE"
    WHEN ED_LEVEL_DEFINITION = "PROFESSIONAL DEGREE" THEN "PROFESSIONAL"
    WHEN ED_LEVEL_DEFINITION = "ASSOCIATE'S DEGREE" THEN "ASSOCIATE"
    WHEN ED_LEVEL_DEFINITION = "HIGH SCHOOL GRADUATE" THEN "ASSOCIATE"
    WHEN ED_LEVEL_DEFINITION = "1 OR MORE YEARS OF COLLEGE, NO DEGREE" THEN "COLLEGE"
    WHEN ED_LEVEL_DEFINITION = "NO CODE PROVIDED OR ILLEGIBLE" THEN "COLLEGE"
    WHEN ED_LEVEL_DEFINITION = "SOME COLLEGE CREDIT, BUT LESS THAN 1 YEAR" THEN "COLLEGE"
    WHEN ED_LEVEL_DEFINITION = "NO DIPLOMA" THEN "CERTIFICATION"
    ELSE ED_LEVEL_DEFINITION
END


StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 37, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

### <mark>**IMPORT COUNTRIES AND LOAD IT AS TABLE**</mark>

In [None]:
import pandas as pd
# Load data into pandas DataFrame from "/lakehouse/default/Files/Countries.csv"
df = pd.read_csv("/lakehouse/default/Files/Countries.csv")
display(df)


In [34]:
import pandas as pd
from pyspark.sql import SparkSession

# Clean column names to remove invalid characters
df.columns = [col.replace(" ", "_").replace("(", "").replace(")", "") for col in df.columns]

# Convert pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Save the Spark DataFrame as a table
spark_df.write.saveAsTable("Countries")



StatementMeta(, 7e19aade-3cda-42bc-bc89-cea28617dd42, 37, Finished, Available, Finished)

### <mark>**DON'T TOUCH THIS ONE**</mark>

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("RemoveNullColumns").getOrCreate()

# Load the data
df = spark.read.format("delta").load("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/visa.Lakehouse/Tables/visa")

# Identify columns with NULL values
columns_to_drop = [col for col in df.columns if df.filter(df[col].isNull()).count() > 0]

# Remove columns with NULL values
cleaned_df = df.drop(*columns_to_drop)

# Save the updated DataFrame back to the original Delta table
cleaned_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/visa.Lakehouse/Tables/visa")


StatementMeta(, 7f0cafd4-792a-4ad2-b32c-a41b8e8607e1, 3, Finished, Available, Finished)

### <mark>**RESTORE PREVIOUS WORK AFTER MESSING UP THE DATA**</mark>

In [5]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("TimeTravel").getOrCreate()

# Retrieve a previous version of the table (e.g., version 2)
previous_df = spark.read.format("delta").option("versionAsOf", 2).load("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/visa.Lakehouse/Tables/visa")

# Save the previous version DataFrame back to the original Delta table
previous_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/visa.Lakehouse/Tables/visa")


StatementMeta(, 7f0cafd4-792a-4ad2-b32c-a41b8e8607e1, 7, Finished, Available, Finished)

In [None]:
df = spark.sql("SELECT * FROM visa.countries LIMIT 1000")
display(df)

In [None]:
df = spark.sql("SELECT * FROM visa.visa LIMIT 1000")
display(df)

StatementMeta(, a3ffd4fb-faa5-4761-8703-f4193546543d, 22, Finished, Available, Finished)

In [23]:
%%sql
SELECT COUNT(*) FROM VISA WHERE ED_LEVEL_DEFINITION='BACHELOR'

StatementMeta(, 2bfe3bc0-38c8-4011-b1f3-b66dcff7121e, 26, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

### **<mark>CHANGE THE TABLE SET</mark>**

In [2]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Correct table path - Use the full Fabric Lakehouse path
table_path = "abfss://BI_PROJECT@onelake.dfs.fabric.microsoft.com/H1B.Lakehouse/Tables/H1B"  # Update this with the correct location of your table in the lakehouse

# Refresh the Delta table metadata with the correct path
spark.sql(f"ALTER TABLE delta.`{table_path}` SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')")


StatementMeta(, f3ca67ba-e945-4fc3-9f67-84d45bf63455, 4, Finished, Available, Finished)

DataFrame[]

In [5]:
%%sql
CREATE TABLE H1B_Data (
	[country_of_birth] varchar(8000) NULL, 
	[ben_year_of_birth] int NULL, 
	[gender] varchar(8000) NULL, 
	[employer_name] varchar(8000) NULL, 
	[FEIN] int NULL, 
	[city] varchar(8000) NULL, 
	[state] varchar(8000) NULL, 
	[lottery_year] int NULL, 
	[status_type] varchar(8000) NULL, 
	[ben_multi_reg_ind] varchar(8000) NULL, 
	[FIRST_DECISION] varchar(8000) NULL, 
	[JOB_TITLE] varchar(8000) NULL, 
	[BEN_EDUCATION_CODE] varchar(8000) NULL, 
	[ED_LEVEL_DEFINITION] varchar(8000) NULL, 
	[BEN_PFIELD_OF_STUDY] varchar(8000) NULL, 
	[BEN_COMP_PAID] decimal(12,2) NULL, 
	[rec_date] date NULL, 
	[first_decision_date] date NULL, 
	[Time] int NULL, 
	[Age] int NULL
);


StatementMeta(, f3ca67ba-e945-4fc3-9f67-84d45bf63455, 7, Finished, Available, Finished)

Error: 
[PARSE_SYNTAX_ERROR] Syntax error at or near '['.(line 2, pos 1)

== SQL ==
CREATE TABLE H1B_Data (
	[country_of_birth] varchar(8000) NULL, 
-^^^
	[ben_year_of_birth] int NULL, 
	[gender] varchar(8000) NULL, 
	[employer_name] varchar(8000) NULL, 
	[FEIN] int NULL, 
	[city] varchar(8000) NULL, 
	[state] varchar(8000) NULL, 
	[lottery_year] int NULL, 
	[status_type] varchar(8000) NULL, 
	[ben_multi_reg_ind] varchar(8000) NULL, 
	[FIRST_DECISION] varchar(8000) NULL, 
	[JOB_TITLE] varchar(8000) NULL, 
	[BEN_EDUCATION_CODE] varchar(8000) NULL, 
	[ED_LEVEL_DEFINITION] varchar(8000) NULL, 
	[BEN_PFIELD_OF_STUDY] varchar(8000) NULL, 
	[BEN_COMP_PAID] decimal(12,2) NULL, 
	[rec_date] date NULL, 
	[first_decision_date] date NULL, 
	[Time] int NULL, 
	[Age] int NULL
)


In [4]:
df = spark.sql("SELECT * FROM H1B.H1B LIMIT 1000")
display(df)

StatementMeta(, f3ca67ba-e945-4fc3-9f67-84d45bf63455, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b09ed681-e203-47da-aee3-1f0c1d8517b8)

In [1]:
df = spark.read.format("delta").load("Tables/H1B")
df.printSchema()


StatementMeta(, b4e362f5-963b-4568-8197-eeecbacdcc05, 3, Finished, Available, Finished)

root
 |-- country_of_birth: string (nullable = true)
 |-- ben_year_of_birth: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- employer_name: string (nullable = true)
 |-- FEIN: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lottery_year: integer (nullable = true)
 |-- status_type: string (nullable = true)
 |-- ben_multi_reg_ind: string (nullable = true)
 |-- FIRST_DECISION: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- BEN_EDUCATION_CODE: string (nullable = true)
 |-- ED_LEVEL_DEFINITION: string (nullable = true)
 |-- BEN_PFIELD_OF_STUDY: string (nullable = true)
 |-- BEN_COMP_PAID: decimal(12,2) (nullable = true)
 |-- rec_date: date (nullable = true)
 |-- first_decision_date: date (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Age: integer (nullable = true)



In [2]:
db = spark.sql("CREATE OR REPLACE TABLE delta.`Tables/H1B` AS SELECT * FROM delta.`Tables/H1B`")


StatementMeta(, b4e362f5-963b-4568-8197-eeecbacdcc05, 4, Finished, Available, Finished)

DataFrame[]

## **<mark>ADDING A NEW COLUMN (Approval_statut : 0 / 1) FOR THE AUTO ML</mark>**

In [1]:
%%sql
ALTER TABLE H1B
ADD COLUMN Approval_Statut INT


StatementMeta(, 59bb9ea0-002c-4aab-874f-91157d2bb230, 2, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [4]:
%%sql
UPDATE H1B
SET Approval_statut = CASE
                    WHEN FIRST_DECISION ="Approved" THEN 1
                    ELSE 0
                    END

StatementMeta(, 59bb9ea0-002c-4aab-874f-91157d2bb230, 6, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

In [5]:
df = spark.sql("SELECT * FROM H1B.H1B LIMIT 1000")
display(df)

StatementMeta(, 59bb9ea0-002c-4aab-874f-91157d2bb230, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6b39c6bf-54ce-40fd-827d-4d674ba8ce5f)