Skip to content

Commit

Permalink
Added notebook for running Simple Model with large data (#1368)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsubbaraj-spiral committed Jun 2, 2023
1 parent 79f9259 commit 39437ad
Showing 1 changed file with 119 additions and 0 deletions.
119 changes: 119 additions & 0 deletions examples/system-tests/Simple_Model_Large_Data.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "bc7d2d08",
"metadata": {},
"outputs": [],
"source": [
"import aqueduct as aq\n",
"from aqueduct.constants.enums import ArtifactType"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9460bb78",
"metadata": {},
"outputs": [],
"source": [
"client = aq.Client(api_key=\"\", aqueduct_address=\"\")\n",
"\n",
"\n",
"aq.global_config({'engine':'databricks_resource', 'lazy':True})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "eb9e417e",
"metadata": {},
"outputs": [],
"source": [
"#This is working with large data > 50GB.\n",
"snowflake_warehouse = client.resource('snowflake_resource')\n",
"hotel_reviews = snowflake_warehouse.sql('SELECT * FROM large_hotel_reviews;')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fa652f83",
"metadata": {},
"outputs": [],
"source": [
"@aq.op(requirements=[])\n",
"def dummy(original_df, num_rows):\n",
" from pyspark.sql.functions import monotonically_increasing_id\n",
" from pyspark.sql.functions import rand\n",
" import math\n",
" \n",
" original_row_count = original_df.count()\n",
" num_partitions = int(math.ceil(num_rows / original_row_count))\n",
"\n",
" # Step 2: Repartition the DataFrame\n",
" replicated_df = original_df.repartition(num_partitions)\n",
"\n",
" # Step 3: Persist the DataFrame\n",
" replicated_df.persist()\n",
"\n",
" # Step 4: Duplicate the rows\n",
" while replicated_df.count() < num_rows:\n",
" replicated_df = replicated_df.union(replicated_df)\n",
"\n",
" print(replicated_df.count())\n",
"\n",
" return replicated_df\n",
"\n",
"generated_df = dummy(hotel_reviews, 10000000000)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "324fb630",
"metadata": {},
"outputs": [],
"source": [
"snowflake_warehouse.save(generated_df, table_name=\"large_hotel_reviews\", update_mode=\"replace\")\n",
"\n",
"\n",
"client.publish_flow(\n",
" \"Creating_Large_Dataset\",\n",
" \"repartition hotel_reviews to create big dataset\",\n",
" artifacts=[generated_df],\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7c9e16e2",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit 39437ad

Please sign in to comment.