In [None]:
#Install yang dibutuhkan

!pip install pyspark python-dotenv pymongo
# Download JARs manually or ensure they are in the classpath/working directory:
# mongo-spark-connector_2.12-3.0.1.jar
# mongo-java-driver-3.12.10.jar
# Ensure you have a compatible JDK (like OpenJDK 11) installed and JAVA_HOME is set in your system environment variables.

## Koneksi ke MOongoDB

In [None]:
# Load Environment Variables
import os
from dotenv import load_dotenv

load_dotenv() # Loads variables from .env file in the current working directory or parent directories

# Fetch MongoDB credentials and config from environment variables
mongo_connection_string = os.getenv("MONGODB_CONNECTION_STRING")
mongo_db_name = os.getenv("MONGODB_DATABASE_NAME")
mongo_input_collection = os.getenv("COLLECTION_FINANCIAL_REPORTS")
mongo_output_collection = "Transformasi_LaporanKeuangan" # Specific collection for this script's output

# Construct the input URI
mongo_input_uri = mongo_connection_string # Use base connection string
mongo_output_uri = mongo_connection_string # Use base connection string for output as well

if not all([mongo_connection_string, mongo_db_name, mongo_input_collection]):
    raise ValueError("One or more MongoDB environment variables are missing. Check your .env file.")

print(f"Using Database: {mongo_db_name}")
print(f"Input Collection: {mongo_input_collection}")
print(f"Output Collection: {mongo_output_collection}")

## Integrasi pyspark

Jika Masih bermasalah maka harus melakukan beberapa langkah instalasi terlebih dahulu dari output cell

In [None]:
import os
from pyspark.sql import SparkSession

# Set JAVA_HOME if not set globally (adjust path if necessary for your Windows setup)
if "JAVA_HOME" not in os.environ:
    print("Warning: JAVA_HOME environment variable not set. Spark might not work correctly.")

# Gabungkan semua JAR jadi satu string (ensure these files are in the current dir or provide full paths)
jars_path = "." # Assumes JARs are in the same directory as the notebook
jars = ",".join([
    os.path.join(jars_path, "mongo-spark-connector_2.12-3.0.1.jar"),
    os.path.join(jars_path, "mongo-java-driver-3.12.10.jar")
])

# Check if JAR files exist
for jar_file in jars.split(','):
    if not os.path.exists(jar_file):
        print(f"Warning: JAR file not found at {jar_file}. Download it or update the path.")

# Build SparkSession
spark = SparkSession.builder \
    .appName("MongoDBSparkTransform") \
    .config("spark.jars", jars) \
    .config("spark.mongodb.input.uri", mongo_input_uri) \
    .config("spark.mongodb.input.database", mongo_db_name) \
    .config("spark.mongodb.input.collection", mongo_input_collection) \
    .config("spark.mongodb.output.uri", mongo_output_uri) \
    .config("spark.mongodb.output.database", mongo_db_name) \
    .config("spark.mongodb.output.collection", mongo_output_collection) \
    .getOrCreate()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, lit, coalesce

# Step 1: Load data from MongoDB using SparkSession config
spark.conf.set("spark.sql.caseSensitive", True)

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

# Step 2: schema yang akan digunakan
schema = StructType([
    StructField("EntityName", StringType(), True),
    StructField("EntityCode", StringType(), True),
    StructField("SalesAndRevenue", DoubleType(), True),
    StructField("GrossProfit", DoubleType(), True),
    StructField("ProfitFromOperation", DoubleType(), True),
    StructField("ProfitLoss", DoubleType(), True),
    StructField("CashAndCashEquivalents", DoubleType(), True),
    StructField("Assets", DoubleType(), True),
    StructField("ShortTermBankLoans", DoubleType(), True),
    StructField("LongTermBankLoans", DoubleType(), True),
    StructField("EquityAttributableToEquityOwnersOfParentEntity", DoubleType(), True),
    StructField("NetCashFlowOp", DoubleType(), True),
    StructField("NetCashFlowInv", DoubleType(), True),
    StructField("NetCashFlowFin", DoubleType(), True)
])

# Step 3: mengambil kolom dari MongoDB
df_selected = df.withColumn("EntityName", col("xbrl_data.EntityName")) \
    .withColumn("EntityCode", col("xbrl_data.EntityCode")) \
    .withColumn("SalesAndRevenue", col("xbrl_data.SalesAndRevenue").cast(DoubleType())) \
    .withColumn("GrossProfit", col("xbrl_data.GrossProfit").cast(DoubleType())) \
    .withColumn("ProfitFromOperation", col("xbrl_data.ProfitFromOperation").cast(DoubleType())) \
    .withColumn("ProfitLoss", col("xbrl_data.ProfitLoss").cast(DoubleType())) \
    .withColumn("CashAndCashEquivalents", col("xbrl_data.CashAndCashEquivalents").cast(DoubleType())) \
    .withColumn("Assets", col("xbrl_data.Assets").cast(DoubleType())) \
    .withColumn("ShortTermBankLoans", col("xbrl_data.ShortTermBankLoans").cast(DoubleType())) \
    .withColumn("LongTermBankLoans", col("xbrl_data.LongTermBankLoans").cast(DoubleType())) \
    .withColumn("EquityAttributableToEquityOwnersOfParentEntity", 
               col("xbrl_data.EquityAttributableToEquityOwnersOfParentEntity").cast(DoubleType())) \
    .withColumn("NetCashFlowOp", 
               col("xbrl_data.NetCashFlowsReceivedFromUsedInOperatingActivities").cast(DoubleType())) \
    .withColumn("NetCashFlowInv", 
               col("xbrl_data.NetCashFlowsReceivedFromUsedInInvestingActivities").cast(DoubleType())) \
    .withColumn("NetCashFlowFin", 
               col("xbrl_data.NetCashFlowsReceivedFromUsedInFinancingActivities").cast(DoubleType()))

# Step 4: menambahkan kolom null untuk kolom yang tidak ada
for field in schema:
    if field.name not in df_selected.columns:
        df_selected = df_selected.withColumn(field.name, lit(None).cast(field.dataType))

# Step 5: Drop kolom yang tidak diperlukan
df_final = df_selected.select(schema.fieldNames())

# Step 6: (Optional) Save to local JSON if needed
# output_path = "terstrukturrr"
# df_final.write.mode("overwrite") \
#     .option("ignoreNullFields", "false") \
#     .json(output_path)


In [None]:
# Fill nulls before showing or writing
df_final = df_final.na.fill(0)
df_final.printSchema() # Good practice to check schema before writing
df_final.show(5) # Show a few rows


## Upload ke MongoDB

In [None]:
# Upload ke MongoDB collection baru using SparkSession config
# The output URI, database, and collection are already configured in the SparkSession
df_final.write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .mode("overwrite") \
    .option("ignoreNullFields", "false") \
    .save()

print(f"Data successfully written to MongoDB database '{mongo_db_name}', collection '{mongo_output_collection}'")

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=88872151-d0a6-473f-a5ae-6bcca9563221' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>