<a href="https://colab.research.google.com/github/aswinaus/ML/blob/main/ADLS_Databricks_ApacheSpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, IndexToString
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col


# Initialize SparkSession for Databricks
# In Databricks, the SparkSession is typically pre-configured,
# so you don't need to explicitly set Hadoop Azure dependencies.
spark = (
    SparkSession.builder
    .appName("TF-IDF-Document-Classification")
    .getOrCreate()
)



# Load millions of docs into Spark DataFrame: (id, text, label)
# Update the path format for Databricks file system access if necessary.
# For example, if your data is in DBFS, the path might look like "/mnt/docs/documents.json"
# or if it's in cloud storage configured with Databricks, the path format might vary.
# Replace "abfss://docs@mydatalake.dfs.core.windows.net/documents.json"
# with the correct path for your Databricks environment.
#df = spark.read.json("/mnt/raw/documents.json")

#df = spark.read.option("header", "true").csv("abfss://docsclasscontainer@docsclassstorageaccount.dfs.core.windows.net/tax_statistics_dataset - Copy - Copy.csv")
#df.display()


configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": "71127fdf-6f8b-4a6d-942a-40f8c9a6e754",
  "fs.azure.account.oauth2.client.secret": "e4l8Q~wsV2oa58YkHLtjnhq-V.AWcwftWH1kAckV",
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/115ee48c-5146-4054-9f33-83e2bfe089fd/oauth2/token"
}


spark.conf.set("fs.azure.account.auth.type.docsclassstorageaccount.dfs.core.windows.net", "OAuth")
for key, val in configs.items():
    spark.conf.set(f"{key}.docsclassstorageaccount.dfs.core.windows.net", val)


df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("abfss://docsclasscontainer@docsclassstorageaccount.dfs.core.windows.net/tax_statistics_dataset - Copy - Copy.csv")
# Rename columns to remove invalid characters
for column_name in df.columns:
    new_col = column_name.replace(' ', '_').replace(',', '').replace(';', '').replace('{', '').replace('}', '').replace('(', '').replace(')', '').replace('\n', '').replace('\t', '').replace('=', '')
    df = df.withColumnRenamed(column_name, new_col)

df.write.mode("overwrite").saveAsTable("tax_statistics_dataset")

df = spark.read.table("tax_statistics_dataset")
display(df)

# Combine text columns
df = df.withColumn("combined_text", concat_ws(" ", col("Size_of_adjusted_gross_income"), col("STATE")))

df.printSchema()
display(df.select("Size_of_adjusted_gross_income", "STATE", "combined_text").limit(10))

# Create label column by combining 'Size_of_adjusted_gross_income' and 'STATE' and indexing it
label_indexer = StringIndexer(inputCol="combined_text", outputCol="indexedLabel").fit(df)

# Tokenize
# It takes the raw text from the inputCol (which is "combined_text" in this case) and splits it into individual words or tokens.
# inputCol="combined_text": Specifies the column in the DataFrame that contains the text need to tokenize.
# outputCol="words": Specifies the name of the new column that will be added to the DataFrame.
# This column will contain an array of the individual words extracted from the combined_text.

tokenizer = Tokenizer(inputCol="combined_text", outputCol="words")

# Term Frequency step
# This step converts the collection of words (the output of the Tokenizer from the previous step) into numerical feature vectors.
# It uses a hashing trick to map words to indices in a fixed-size vector.

# inputCol="words": Takes the array of words from the previous step as input.

# outputCol="rawFeatures": Creates a new column containing the raw term frequency vectors.
# Each element in the vector represents the count of a specific hashed word in the document.

# numFeatures=100000: This is the size of the feature vector. A larger number can reduce collisions in the hashing process but also increases memory usage.
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)

# Inverse Document Frequency step This step re-weights the raw term frequency features.
# The idea is to give less importance to words that appear frequently across many documents (like "the", "a", "is") and more importance to words
# that are unique to specific documents.

# inputCol="rawFeatures": Takes the raw term frequency vectors from the HashingTF step as input.

# outputCol="features": Creates a new column containing the final TF-IDF feature vectors.
# These vectors are a more refined representation of the text data where the importance of each word is weighted by its frequency
# in the document and its rarity across the entire dataset.
idf = IDF(inputCol="rawFeatures", outputCol="features")


# Together, these three steps transform your raw text data into a numerical format (features) that can be used by a machine learning model like Logistic Regression for tasks like document classification.
# This process is a common technique in natural language processing for preparing text data for machine learning algorithms.
# Logistic Regression
# Logistic Regression: This model is a linear model that predicts the probability of a binary outcome.
# In this case, it's trained on the TF-IDF features extracted from the text to classify documents into predefined categories. It relies on statistical relationships between features and labels.

# maxIter=20: This parameter sets the maximum number of iterations the optimization algorithm will
# run to find the model's coefficients. A higher number of iterations might lead to a better fit but can also increase training time.

# regParam=0.01: This parameter controls the amount of regularization applied to the model.
# Regularization helps prevent overfitting by adding a penalty to the model's complexity.
# A value of 0.01 indicates a small amount of regularization.

# labelCol="indexedLabel": This parameter specifies the name of the column in the DataFrame that contains the target variable or labels.
# In this case, it's set to "indexedLabel", which is the numerical column created by the StringIndexer earlier in the pipeline.
# The model will learn to predict these labels based on the input features.
lr = LogisticRegression(maxIter=20, regParam=0.01, labelCol="indexedLabel")

# Convert indexed labels back to original labels for predictions
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=label_indexer.labels)


# Build pipeline
pipeline = Pipeline(stages=[label_indexer, tokenizer, hashingTF, idf, lr, label_converter])

# Train model
model = pipeline.fit(df)

# Predict
predictions = model.transform(df)

# Display predictions
display(predictions.select("Size_of_adjusted_gross_income", "STATE", "combined_text", "indexedLabel", "prediction", "predictedLabel").limit(20))

In [None]:
from pyspark.sql import Row

# Replace with the actual content read from your document
document_content = """
This is the content of your document.
It can have multiple lines and paragraphs.
You would read this from '/content/Transfer Priciing Application.txt'


How to define a transfer pricing methodology?

Transfer pricing methodology is an  approach to determining the transfer price.

Let me give you an example.
Example
Imagine the Vista Group - a multinational company  that manufactures and distributes widgets. There
are three key business processes:
manufacturing,
distribution and
sale to the final customer.

Usually, modern business involves the  registration of separate companies,
or entities, that form a group of companies.

It is often the case that these companies  are located in different countries, and local
authorities control the tax compliance of these  entities. In essence, tax authorities want to
ensure that the business profits are allocated to  the parts of the supply chain that generate value.

One of the most critical aspects  of international tax compliance is transfer pricing - the process of  establishing prices in intercompany transactions.

In our example, there are two  intercompany transactions -
The manufacturing company sells  widgets to the Distribution company,
and the Distribution company  sells goods to a sales company.

In both transactions, Vista  Group needs to establish pricing methodologies that comply  with transfer pricing regulations.

For example, the manufacturing entity can  price goods based on the manufacturing cost and a mark-up, and
             the distribution company will  determine prices on a resale margin basis.

That is what we call transfer pricing methodologies  - the approaches to setting the transfer price.

In real life, multinational companies  have lots of different transactions,
including intragroup services, loans,  financial transactions, and asset transfers.

The transfer pricing management system aims to  set, track and control transfer pricing models
or methodologies, and ensure the consistency  and central monitoring of the transfer pricing
process.

Without a proper framework, the  control gap can grow substantially when
the size and complexity of the company  increase. While tax managers can manage
Two approaches to transfer pricing

transfer pricing methodologies in a traditional  way using Excels, Word documents and emails,
there is a better approach  called digital transfer pricing.

Need a dedicated digital workflow that allows defining and controlling TP  methodologies in any multinational company.

The process includes 8 steps:
Defining TP models
Specifying functions, assets and risks Adding definitions
Selecting transfer pricing methods Mapping transactions to tp models
Mapping TP attributes, such  as benchmarking studies,
Identifying profit level indicators, and
Validating arm’s length nature of the transaction

Digital transfer pricing applications  are built around the workflows
that allow defining TP methodologies  in a structured and consistent way.

When definining the TP methodologies,the first step is to add the TP models.

TP models are particular methodologies  applied to a set of transactions.
then - indicate functions, assets and  risks, in other words - functional analysis

Next - specify the definitions. For example, one  of the sides of the TP model may be a contract
manufacturer, while in the other model -  a limited or fully fledged distributor.

In the following step you need to  select transfer pricing methods
and indicate appropriate characteristics, such as  tested party and the profit level indicator used.

Next - map the transactions  to TP models. In this step,
you indicate which transactions were priced  in accordance with the pre-set TP methodology.

Map TP attributes, like benchmarking  studies, to transactions. By this,
you let the application know which benchmarks  are relevant for price setting or price testing.

Next, indicate profit level indicatiors.  These can be ex-ante or ex-post results.

And, at the end, validate the arm’s  length nature of transactions,
ensuring the prices are in line  with transfer pricing methodologies.

Why go digital
By using a digital transfer pricing platform, you  not just solve one problem - like preparation of
transfer pricing documentation. Instead, you build  an end-to-end system that enables full real-time
control over your transfer pricing, where all  applications are integrated and synchronized.

"""

# Create a list of Rows, each representing a document
# For a single document, you'll have one row.
# You would typically determine the 'label' based on your classification task.
# Here, we use a placeholder label.
data = [Row(id="doc1", text=document_content, label="unknown")]

# Create the Spark DataFrame
sample_df = spark.createDataFrame(data)

# Display the sample DataFrame
display(sample_df)