In [0]:
pip install jinja2

In [0]:
%restart_python

dbutils.library.restartPython()

In [0]:
from jinja2 import Template

In [0]:
query_text = """
SELECT 
    {% for param in parameters %}
        {{ param.cols }}
        {% if not loop.last %}
        ,
           {% endif %}
    {% endfor %}
  FROM 
      {% for param in parameters %}
         { if loop.first %}
            {{ param['table'] }} as {{ param['alias'] }}
       {% endif %}
    {% endfor %}
    {% for param in parameters %}
    { if not loop.first %}
    LEFT JOIN 
        {{ param['table'] }} as {{ param['alias'] }}
    ON 
           {{ param['condition'] }}
        {% endif %}
    {% endfor %}



"""

## MANI PARAMETERE

In [0]:
parameters = [
    {
        "table": "spotify_data.silver.factstream",
        "alias": "factstream",
        "cols": "factstream.stream_id, factstream.listen_duration"
    },

    {
        "table": "spotify_data.silver.dimuser",
        "alias": "dimuser",
        "cols": "dimuser.user_id, dimuser.user_name",
        "condition": "factstream.user_id = dimuser.user_id"
    },
    {
        "table": "spotify_data.silver.dimtrack",
        "alias": "dimtrack",
        "cols": "dimtrack.track_id, dimtrack.track_name",
        "condition": "factstream.track_id = dimtrack.track_id"
    }
]

## CORRECT CODE

In [0]:
query_text = """
SELECT
    {% for param in parameters %}
        {{ param['cols'] }}{% if not loop.last %}, {% endif %}
    {% endfor %}
FROM
    {{ parameters[0]['table'] }} AS {{ parameters[0]['alias'] }}
{% for param in parameters[1:] %}
LEFT JOIN
    {{ param['table'] }} AS {{ param['alias'] }}
ON
    {{ param['condition'] }}
{% endfor %}
"""

In [0]:
jinja_sql_str = Template(query_text)  # Ensure query_text has correct Jinja syntax with proper if-endfor blocks.
query = jinja_sql_str.render(parameters=parameters)
print(query)

In [0]:
query_text = """
SELECT 
    {% for param in parameters %}
        {{ param.cols }}{% if not loop.last %}, {% endif %}
    {% endfor %}
FROM 
    {{ parameters[0]['table'] }} as {{ parameters[0]['alias'] }}
{% for param in parameters[1:] %}
    LEFT JOIN {{ param['table'] }} as {{ param['alias'] }}
        ON {{ param['condition'] }}
{% endfor %}
"""

In [0]:
display(spark.sql(query))  # Check and correct the 'query' string for syntax errors before execution.


In [0]:
df_main = display(spark.sql(query))

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
deduped_df = df_main.dropDuplicates([
    "stream_id", "user_id", "user_name", "track_id", "track_name"
])
display(deduped_df)